diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 0579c67d2..dcc9372f5 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -77,6 +77,7 @@ set(ICEBERG_SOURCES transform_function.cc type.cc update/pending_update.cc + update/update_sort_order.cc update/update_properties.cc util/bucket_util.cc util/conversions.cc diff --git a/src/iceberg/expression/term.h b/src/iceberg/expression/term.h index 5e834af51..b50ea9242 100644 --- a/src/iceberg/expression/term.h +++ b/src/iceberg/expression/term.h @@ -151,6 +151,8 @@ class ICEBERG_EXPORT BoundReference std::shared_ptr type() const override { return field_.type(); } + int32_t field_id() const { return field_.field_id(); } + bool MayProduceNull() const override { return field_.optional(); } bool Equals(const BoundTerm& other) const override; diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 850f65905..1596a2ead 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -100,6 +100,7 @@ iceberg_sources = files( 'type.cc', 'update/pending_update.cc', 'update/update_properties.cc', + 'update/update_sort_order.cc', 'util/bucket_util.cc', 'util/conversions.cc', 'util/decimal.cc', diff --git a/src/iceberg/sort_order.cc b/src/iceberg/sort_order.cc index 71e5e18df..fca138a6c 100644 --- a/src/iceberg/sort_order.cc +++ b/src/iceberg/sort_order.cc @@ -111,7 +111,7 @@ Result> SortOrder::Make(const Schema& schema, int32_t } if (fields.empty() && sort_id != kUnsortedOrderId) [[unlikely]] { - return InvalidArgument("Sort order must have at least one sort field"); + return InvalidArgument("Sort order must have at least one sort field."); } auto sort_order = std::unique_ptr(new SortOrder(sort_id, std::move(fields))); diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index b5a1e582e..38afdedd0 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -154,6 +154,13 @@ Result> Table::NewUpdateProperties() { return transaction->NewUpdateProperties(); } +Result> Table::NewUpdateSortOrder() { + ICEBERG_ASSIGN_OR_RAISE( + auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate, + /*auto_commit=*/true)); + return transaction->NewUpdateSortOrder(); +} + Result> StagedTable::Make( TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, diff --git a/src/iceberg/table.h b/src/iceberg/table.h index efe175828..317aea018 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -132,6 +132,10 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this { /// changes. virtual Result> NewUpdateProperties(); + /// \brief Create a new UpdateSortOrder to update the table sort order and commit the + /// changes. + virtual Result> NewUpdateSortOrder(); + protected: Table(TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index 40dcb03c0..2d77aef43 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -29,6 +29,7 @@ #include #include #include +#include #include #include @@ -426,7 +427,7 @@ class TableMetadataBuilder::Impl { Status SetDefaultSortOrder(int32_t order_id); Result AddSortOrder(const SortOrder& order); Status SetProperties(const std::unordered_map& updated); - Status RemoveProperties(const std::vector& removed); + Status RemoveProperties(const std::unordered_set& removed); std::unique_ptr Build(); @@ -590,7 +591,7 @@ Status TableMetadataBuilder::Impl::SetProperties( } Status TableMetadataBuilder::Impl::RemoveProperties( - const std::vector& removed) { + const std::unordered_set& removed) { // If removed is empty, return early (no-op) if (removed.empty()) { return {}; @@ -820,7 +821,7 @@ TableMetadataBuilder& TableMetadataBuilder::SetProperties( } TableMetadataBuilder& TableMetadataBuilder::RemoveProperties( - const std::vector& removed) { + const std::unordered_set& removed) { ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->RemoveProperties(removed)); return *this; } diff --git a/src/iceberg/table_metadata.h b/src/iceberg/table_metadata.h index ce7975c6e..daaada6e6 100644 --- a/src/iceberg/table_metadata.h +++ b/src/iceberg/table_metadata.h @@ -394,7 +394,7 @@ class ICEBERG_EXPORT TableMetadataBuilder : public ErrorCollector { /// /// \param removed Set of property keys to remove /// \return Reference to this builder for method chaining - TableMetadataBuilder& RemoveProperties(const std::vector& removed); + TableMetadataBuilder& RemoveProperties(const std::unordered_set& removed); /// \brief Set the table location /// diff --git a/src/iceberg/table_update.h b/src/iceberg/table_update.h index a040cb36c..71db517b8 100644 --- a/src/iceberg/table_update.h +++ b/src/iceberg/table_update.h @@ -26,6 +26,7 @@ #include #include #include +#include #include #include "iceberg/iceberg_export.h" @@ -379,10 +380,10 @@ class ICEBERG_EXPORT SetProperties : public TableUpdate { /// \brief Represents removing table properties class ICEBERG_EXPORT RemoveProperties : public TableUpdate { public: - explicit RemoveProperties(std::vector removed) + explicit RemoveProperties(std::unordered_set removed) : removed_(std::move(removed)) {} - const std::vector& removed() const { return removed_; } + const std::unordered_set& removed() const { return removed_; } void ApplyTo(TableMetadataBuilder& builder) const override; @@ -391,7 +392,7 @@ class ICEBERG_EXPORT RemoveProperties : public TableUpdate { Kind kind() const override { return Kind::kRemoveProperties; } private: - std::vector removed_; + std::unordered_set removed_; }; /// \brief Represents setting the table location diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index fef63efee..0d36f64cb 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -154,7 +154,8 @@ if(ICEBERG_BUILD_BUNDLE) USE_BUNDLE SOURCES transaction_test.cc - update_properties_test.cc) + update_properties_test.cc + update_sort_order_test.cc) endif() diff --git a/src/iceberg/test/table_requirements_test.cc b/src/iceberg/test/table_requirements_test.cc index 041b44dd1..3278eb883 100644 --- a/src/iceberg/test/table_requirements_test.cc +++ b/src/iceberg/test/table_requirements_test.cc @@ -939,7 +939,7 @@ TEST(TableRequirementsTest, RemoveProperties) { std::vector> updates; updates.push_back( - std::make_unique(std::vector{"test"})); + std::make_unique(std::unordered_set{"test"})); auto result = TableRequirements::ForUpdateTable(*metadata, updates); ASSERT_THAT(result, IsOk()); diff --git a/src/iceberg/test/table_update_test.cc b/src/iceberg/test/table_update_test.cc index afa70894c..041cfcd23 100644 --- a/src/iceberg/test/table_update_test.cc +++ b/src/iceberg/test/table_update_test.cc @@ -175,7 +175,7 @@ INSTANTIATE_TEST_SUITE_P( .update_factory = [] { return std::make_unique( - std::vector{"key"}); + std::unordered_set{"key"}); }, .expected_existing_table_count = 0, .validator = nullptr}, diff --git a/src/iceberg/test/transaction_test.cc b/src/iceberg/test/transaction_test.cc index b8a55d83c..232febc1f 100644 --- a/src/iceberg/test/transaction_test.cc +++ b/src/iceberg/test/transaction_test.cc @@ -19,11 +19,14 @@ #include "iceberg/transaction.h" -#include "iceberg/table.h" -#include "iceberg/table_update.h" +#include "iceberg/expression/expressions.h" +#include "iceberg/expression/term.h" +#include "iceberg/sort_order.h" #include "iceberg/test/matchers.h" #include "iceberg/test/update_test_base.h" +#include "iceberg/transform.h" #include "iceberg/update/update_properties.h" +#include "iceberg/update/update_sort_order.h" namespace iceberg { @@ -35,14 +38,6 @@ TEST_F(TransactionTest, CreateTransaction) { EXPECT_EQ(txn->table(), table_); } -TEST_F(TransactionTest, UpdatePropertiesInTransaction) { - ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); - ICEBERG_UNWRAP_OR_FAIL(auto update, txn->NewUpdateProperties()); - - update->Set("key1", "value1"); - EXPECT_THAT(update->Apply(), IsOk()); -} - TEST_F(TransactionTest, CommitEmptyTransaction) { ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); EXPECT_THAT(txn->Commit(), IsOk()); @@ -67,24 +62,36 @@ TEST_F(TransactionTest, CommitTransactionWithPropertyUpdate) { TEST_F(TransactionTest, MultipleUpdatesInTransaction) { ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); - // First update + // First update: set property ICEBERG_UNWRAP_OR_FAIL(auto update1, txn->NewUpdateProperties()); - update1->Set("key1", "value1"); + update1->Set("key1", "value1").Set("key2", "value2"); EXPECT_THAT(update1->Commit(), IsOk()); - // Second update - ICEBERG_UNWRAP_OR_FAIL(auto update2, txn->NewUpdateProperties()); - update2->Set("key2", "value2"); + // Second update: update sort order + ICEBERG_UNWRAP_OR_FAIL(auto update2, txn->NewUpdateSortOrder()); + auto term = + UnboundTransform::Make(Expressions::Ref("x"), Transform::Identity()).value(); + update2->AddSortField(std::move(term), SortDirection::kAscending, NullOrder::kFirst); EXPECT_THAT(update2->Commit(), IsOk()); // Commit transaction ICEBERG_UNWRAP_OR_FAIL(auto updated_table, txn->Commit()); - // Verify both properties were set + // Verify properties were set ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); const auto& props = reloaded->properties().configs(); EXPECT_EQ(props.at("key1"), "value1"); EXPECT_EQ(props.at("key2"), "value2"); + + // Verify sort order was updated + ICEBERG_UNWRAP_OR_FAIL(auto sort_order, reloaded->sort_order()); + std::vector expected_fields; + expected_fields.emplace_back(1, Transform::Identity(), SortDirection::kAscending, + NullOrder::kFirst); + ICEBERG_UNWRAP_OR_FAIL( + auto expected_sort_order, + SortOrder::Make(sort_order->order_id(), std::move(expected_fields))); + EXPECT_EQ(*sort_order, *expected_sort_order); } } // namespace iceberg diff --git a/src/iceberg/test/update_properties_test.cc b/src/iceberg/test/update_properties_test.cc index 5350af422..8ac8e5eb2 100644 --- a/src/iceberg/test/update_properties_test.cc +++ b/src/iceberg/test/update_properties_test.cc @@ -19,7 +19,6 @@ #include "iceberg/update/update_properties.h" -#include "iceberg/table_update.h" #include "iceberg/test/matchers.h" #include "iceberg/test/update_test_base.h" @@ -27,13 +26,21 @@ namespace iceberg { class UpdatePropertiesTest : public UpdateTestBase {}; +TEST_F(UpdatePropertiesTest, EmptyUpdate) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateProperties()); + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_THAT(result.updates.empty(), true); +} + TEST_F(UpdatePropertiesTest, SetProperty) { ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateProperties()); update->Set("key1", "value1").Set("key2", "value2"); ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); - EXPECT_EQ(result.updates.size(), 1); - EXPECT_EQ(result.updates[0]->kind(), table::SetProperties::Kind::kSetProperties); + EXPECT_EQ(result.updates.size(), 2); + EXPECT_EQ(result.updates.at("key1"), "value1"); + EXPECT_EQ(result.updates.at("key2"), "value2"); + EXPECT_TRUE(result.removals.empty()); } TEST_F(UpdatePropertiesTest, RemoveProperty) { @@ -48,8 +55,10 @@ TEST_F(UpdatePropertiesTest, RemoveProperty) { update->Remove("key1").Remove("key2"); ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); - EXPECT_EQ(result.updates.size(), 1); - EXPECT_EQ(result.updates[0]->kind(), table::RemoveProperties::Kind::kRemoveProperties); + EXPECT_TRUE(result.updates.empty()); + EXPECT_EQ(result.removals.size(), 2); + EXPECT_TRUE(result.removals.contains("key1")); + EXPECT_TRUE(result.removals.contains("key2")); } TEST_F(UpdatePropertiesTest, SetThenRemoveSameKey) { @@ -73,22 +82,23 @@ TEST_F(UpdatePropertiesTest, RemoveThenSetSameKey) { TEST_F(UpdatePropertiesTest, SetAndRemoveDifferentKeys) { ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateProperties()); update->Set("key1", "value1").Remove("key2"); - EXPECT_THAT(update->Commit(), IsOk()); - ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); - const auto& props = reloaded->properties().configs(); - EXPECT_EQ(props.at("key1"), "value1"); - EXPECT_FALSE(props.contains("key2")); + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_EQ(result.updates.size(), 1); + EXPECT_EQ(result.updates.at("key1"), "value1"); + EXPECT_EQ(result.removals.size(), 1); + EXPECT_TRUE(result.removals.contains("key2")); } TEST_F(UpdatePropertiesTest, UpgradeFormatVersionValid) { ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateProperties()); - update->Set("format-version", "2"); + update->Set("format-version", "3"); ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); - EXPECT_EQ(result.updates.size(), 1); - EXPECT_EQ(result.updates[0]->kind(), - table::UpgradeFormatVersion::Kind::kUpgradeFormatVersion); + EXPECT_TRUE(result.updates.empty()); + EXPECT_TRUE(result.removals.empty()); + ASSERT_TRUE(result.format_version.has_value()); + EXPECT_EQ(result.format_version.value(), 3); } TEST_F(UpdatePropertiesTest, UpgradeFormatVersionInvalidString) { diff --git a/src/iceberg/test/update_sort_order_test.cc b/src/iceberg/test/update_sort_order_test.cc new file mode 100644 index 000000000..e5f5fc0e9 --- /dev/null +++ b/src/iceberg/test/update_sort_order_test.cc @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/update_sort_order.h" + +#include +#include + +#include +#include + +#include "iceberg/expression/expressions.h" +#include "iceberg/expression/term.h" +#include "iceberg/result.h" +#include "iceberg/sort_field.h" +#include "iceberg/sort_order.h" +#include "iceberg/test/matchers.h" +#include "iceberg/test/update_test_base.h" +#include "iceberg/transform.h" + +namespace iceberg { + +class UpdateSortOrderTest : public UpdateTestBase { + protected: + // Helper function to apply update and verify the resulting sort order + void ApplyAndExpectSortOrder(UpdateSortOrder* update, + std::vector expected_fields) { + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + ICEBERG_UNWRAP_OR_FAIL( + auto expected_sort_order, + SortOrder::Make(result.sort_order->order_id(), std::move(expected_fields))); + EXPECT_EQ(*result.sort_order, *expected_sort_order); + } +}; + +TEST_F(UpdateSortOrderTest, EmptySortOrder) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSortOrder()); + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + // Should succeed with an unsorted order + EXPECT_TRUE(result.sort_order->fields().empty()); +} + +TEST_F(UpdateSortOrderTest, AddSingleSortFieldAscending) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSortOrder()); + auto term = Expressions::Transform("x", Transform::Identity()); + update->AddSortField(term, SortDirection::kAscending, NullOrder::kFirst); + + std::vector expected_fields; + expected_fields.emplace_back(1, Transform::Identity(), SortDirection::kAscending, + NullOrder::kFirst); + ApplyAndExpectSortOrder(update.get(), std::move(expected_fields)); +} + +TEST_F(UpdateSortOrderTest, AddSingleSortFieldDescending) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSortOrder()); + auto term = Expressions::Transform("y", Transform::Identity()); + update->AddSortField(term, SortDirection::kDescending, NullOrder::kLast); + + std::vector expected_fields; + expected_fields.emplace_back(2, Transform::Identity(), SortDirection::kDescending, + NullOrder::kLast); + ApplyAndExpectSortOrder(update.get(), std::move(expected_fields)); +} + +TEST_F(UpdateSortOrderTest, AddMultipleSortFields) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSortOrder()); + auto term1 = Expressions::Transform("y", Transform::Identity()); + auto term2 = Expressions::Transform("x", Transform::Identity()); + update->AddSortField(term1, SortDirection::kAscending, NullOrder::kFirst) + .AddSortField(term2, SortDirection::kDescending, NullOrder::kLast); + + std::vector expected_fields; + expected_fields.emplace_back(2, Transform::Identity(), SortDirection::kAscending, + NullOrder::kFirst); + expected_fields.emplace_back(1, Transform::Identity(), SortDirection::kDescending, + NullOrder::kLast); + ApplyAndExpectSortOrder(update.get(), std::move(expected_fields)); +} + +TEST_F(UpdateSortOrderTest, AddSortFieldWithNamedReference) { + // Test that we can directly use NamedReference (kReference) which is treated as + // identity + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSortOrder()); + auto ref = Expressions::Ref("x"); + update->AddSortField(ref, SortDirection::kAscending, NullOrder::kFirst); + + std::vector expected_fields; + expected_fields.emplace_back(1, Transform::Identity(), SortDirection::kAscending, + NullOrder::kFirst); + ApplyAndExpectSortOrder(update.get(), std::move(expected_fields)); +} + +TEST_F(UpdateSortOrderTest, AddSortFieldByName) { + // Test the convenience method for adding sort field by name + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSortOrder()); + update->AddSortFieldByName("x", SortDirection::kAscending, NullOrder::kFirst); + + std::vector expected_fields; + expected_fields.emplace_back(1, Transform::Identity(), SortDirection::kAscending, + NullOrder::kFirst); + ApplyAndExpectSortOrder(update.get(), std::move(expected_fields)); +} + +TEST_F(UpdateSortOrderTest, AddSortFieldWithTruncateTransform) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSortOrder()); + auto term = Expressions::Truncate("x", 10); + update->AddSortField(term, SortDirection::kAscending, NullOrder::kFirst); + + std::vector expected_fields; + expected_fields.emplace_back(1, Transform::Truncate(10), SortDirection::kAscending, + NullOrder::kFirst); + ApplyAndExpectSortOrder(update.get(), std::move(expected_fields)); +} + +TEST_F(UpdateSortOrderTest, AddSortFieldWithBucketTransform) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSortOrder()); + auto term = Expressions::Bucket("y", 10); + update->AddSortField(term, SortDirection::kDescending, NullOrder::kLast); + + std::vector expected_fields; + expected_fields.emplace_back(2, Transform::Bucket(10), SortDirection::kDescending, + NullOrder::kLast); + ApplyAndExpectSortOrder(update.get(), std::move(expected_fields)); +} + +TEST_F(UpdateSortOrderTest, AddSortFieldNullTerm) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSortOrder()); + + update->AddSortField(nullptr, SortDirection::kAscending, NullOrder::kFirst); + + auto result = update->Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Term cannot be null")); +} + +TEST_F(UpdateSortOrderTest, AddSortFieldInvalidTransform) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSortOrder()); + + // Try to apply day transform to a long field (invalid - day requires date/timestamp) + auto ref = NamedReference::Make("x").value(); + auto term = UnboundTransform::Make(std::move(ref), Transform::Day()).value(); + + update->AddSortField(std::move(term), SortDirection::kAscending, NullOrder::kFirst); + + auto result = update->Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("not a valid input type")); +} + +TEST_F(UpdateSortOrderTest, AddSortFieldNonExistentField) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSortOrder()); + + auto ref = NamedReference::Make("nonexistent").value(); + auto term = UnboundTransform::Make(std::move(ref), Transform::Identity()).value(); + + update->AddSortField(std::move(term), SortDirection::kAscending, NullOrder::kFirst); + + auto result = update->Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Cannot find")); +} + +TEST_F(UpdateSortOrderTest, CaseSensitiveTrue) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSortOrder()); + + auto ref = NamedReference::Make("X").value(); // Uppercase + auto term = UnboundTransform::Make(std::move(ref), Transform::Identity()).value(); + + update->CaseSensitive(true).AddSortField(std::move(term), SortDirection::kAscending, + NullOrder::kFirst); + + auto result = update->Apply(); + // Should fail because schema has "x" (lowercase) + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); +} + +TEST_F(UpdateSortOrderTest, CaseSensitiveFalse) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSortOrder()); + auto ref = NamedReference::Make("X").value(); // Uppercase + auto term = UnboundTransform::Make(std::move(ref), Transform::Identity()).value(); + update->CaseSensitive(false).AddSortField(std::move(term), SortDirection::kAscending, + NullOrder::kFirst); + + // Should succeed because case-insensitive matching + std::vector expected_fields; + expected_fields.emplace_back(1, Transform::Identity(), SortDirection::kAscending, + NullOrder::kFirst); + ApplyAndExpectSortOrder(update.get(), std::move(expected_fields)); +} + +TEST_F(UpdateSortOrderTest, CommitSuccess) { + // Test empty commit + ICEBERG_UNWRAP_OR_FAIL(auto empty_update, table_->NewUpdateSortOrder()); + EXPECT_THAT(empty_update->Commit(), IsOk()); + + // Reload table after first commit + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + + // Test commit with sort order changes + ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSortOrder()); + auto ref = NamedReference::Make("x").value(); + auto term = UnboundTransform::Make(std::move(ref), Transform::Identity()).value(); + update->AddSortField(std::move(term), SortDirection::kAscending, NullOrder::kFirst); + + EXPECT_THAT(update->Commit(), IsOk()); + + // Verify the sort order was committed + ICEBERG_UNWRAP_OR_FAIL(auto final_table, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto sort_order, final_table->sort_order()); + + std::vector expected_fields; + expected_fields.emplace_back(1, Transform::Identity(), SortDirection::kAscending, + NullOrder::kFirst); + ICEBERG_UNWRAP_OR_FAIL( + auto expected_sort_order, + SortOrder::Make(sort_order->order_id(), std::move(expected_fields))); + EXPECT_EQ(*sort_order, *expected_sort_order); +} + +} // namespace iceberg diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index c8cf42d36..49416cdc0 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -19,13 +19,18 @@ */ #include "iceberg/transaction.h" +#include + #include "iceberg/catalog.h" #include "iceberg/table.h" #include "iceberg/table_metadata.h" #include "iceberg/table_requirement.h" #include "iceberg/table_requirements.h" #include "iceberg/table_update.h" +#include "iceberg/update/pending_update.h" #include "iceberg/update/update_properties.h" +#include "iceberg/update/update_sort_order.h" +#include "iceberg/util/checked_cast.h" #include "iceberg/util/macros.h" namespace iceberg { @@ -60,9 +65,29 @@ Status Transaction::AddUpdate(const std::shared_ptr& update) { return {}; } -Status Transaction::Apply(std::vector> updates) { - for (const auto& update : updates) { - update->ApplyTo(*metadata_builder_); +Status Transaction::Apply(PendingUpdate& update) { + switch (update.kind()) { + case PendingUpdate::Kind::kUpdateProperties: { + auto& update_properties = internal::checked_cast(update); + ICEBERG_ASSIGN_OR_RAISE(auto result, update_properties.Apply()); + if (!result.updates.empty()) { + metadata_builder_->SetProperties(std::move(result.updates)); + } + if (!result.removals.empty()) { + metadata_builder_->RemoveProperties(std::move(result.removals)); + } + if (result.format_version.has_value()) { + metadata_builder_->UpgradeFormatVersion(result.format_version.value()); + } + } break; + case PendingUpdate::Kind::kUpdateSortOrder: { + auto& update_sort_order = internal::checked_cast(update); + ICEBERG_ASSIGN_OR_RAISE(auto result, update_sort_order.Apply()); + metadata_builder_->SetDefaultSortOrder(result.sort_order); + } break; + default: + return NotSupported("Unsupported pending update: {}", + static_cast(update.kind())); } last_update_committed_ = true; @@ -119,4 +144,11 @@ Result> Transaction::NewUpdateProperties() { return update_properties; } +Result> Transaction::NewUpdateSortOrder() { + ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr update_sort_order, + UpdateSortOrder::Make(shared_from_this())); + ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_sort_order)); + return update_sort_order; +} + } // namespace iceberg diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index 73c346833..05fdea325 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -60,13 +60,17 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this> NewUpdateProperties(); + /// \brief Create a new UpdateSortOrder to update the table sort order and commit the + /// changes. + Result> NewUpdateSortOrder(); + private: Transaction(std::shared_ptr
table, Kind kind, bool auto_commit); Status AddUpdate(const std::shared_ptr& update); /// \brief Apply the pending changes to current table. - Status Apply(std::vector> updates); + Status Apply(PendingUpdate& updates); friend class PendingUpdate; // Need to access the Apply method. diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 133a7043c..8ca444ccd 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -178,6 +178,7 @@ class Transaction; /// \brief Update family. class PendingUpdate; class UpdateProperties; +class UpdateSortOrder; /// ---------------------------------------------------------------------------- /// TODO: Forward declarations below are not added yet. diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build index 38502b14e..b0ec67066 100644 --- a/src/iceberg/update/meson.build +++ b/src/iceberg/update/meson.build @@ -16,6 +16,6 @@ # under the License. install_headers( - ['pending_update.h', 'update_properties.h'], + ['pending_update.h', 'update_sort_order.h', 'update_properties.h'], subdir: 'iceberg/update', ) diff --git a/src/iceberg/update/pending_update.cc b/src/iceberg/update/pending_update.cc index 4dbc67884..535e7b41c 100644 --- a/src/iceberg/update/pending_update.cc +++ b/src/iceberg/update/pending_update.cc @@ -19,9 +19,7 @@ #include "iceberg/update/pending_update.h" -#include "iceberg/table_update.h" #include "iceberg/transaction.h" -#include "iceberg/util/macros.h" namespace iceberg { @@ -30,9 +28,6 @@ PendingUpdate::PendingUpdate(std::shared_ptr transaction) PendingUpdate::~PendingUpdate() = default; -Status PendingUpdate::Commit() { - ICEBERG_ASSIGN_OR_RAISE(auto apply_result, Apply()); - return transaction_->Apply(std::move(apply_result.updates)); -} +Status PendingUpdate::Commit() { return transaction_->Apply(*this); } } // namespace iceberg diff --git a/src/iceberg/update/pending_update.h b/src/iceberg/update/pending_update.h index c4618400d..c298ba800 100644 --- a/src/iceberg/update/pending_update.h +++ b/src/iceberg/update/pending_update.h @@ -43,25 +43,12 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector { public: enum class Kind : uint8_t { kUpdateProperties, + kUpdateSortOrder, }; /// \brief Return the kind of this pending update. virtual Kind kind() const = 0; - struct ApplyResult { - std::vector> updates; - }; - - /// \brief Apply the pending changes and return the uncommitted changes for validation. - /// - /// \note This does not result in a permanent update. - /// \return The uncommitted changes that would be committed by calling Commit(), or an - /// error: - /// - ValidationFailed: the pending changes cannot be applied to the current - /// metadata - /// - InvalidArgument: if pending changes are conflicting or invalid - virtual Result Apply() = 0; - /// \brief Apply the pending changes and commit. /// /// \return An OK status if the commit was successful, or an error: diff --git a/src/iceberg/update/update_properties.cc b/src/iceberg/update/update_properties.cc index 9cdd7b5a7..ce809c437 100644 --- a/src/iceberg/update/update_properties.cc +++ b/src/iceberg/update/update_properties.cc @@ -28,7 +28,6 @@ #include "iceberg/result.h" #include "iceberg/table_metadata.h" #include "iceberg/table_properties.h" -#include "iceberg/table_update.h" #include "iceberg/transaction.h" #include "iceberg/util/error_collector.h" #include "iceberg/util/macros.h" @@ -37,9 +36,8 @@ namespace iceberg { Result> UpdateProperties::Make( std::shared_ptr transaction) { - if (!transaction) [[unlikely]] { - return InvalidArgument("Cannot create UpdateProperties without a transaction"); - } + ICEBERG_PRECHECK(transaction != nullptr, + "Cannot create UpdateProperties without a transaction"); return std::shared_ptr(new UpdateProperties(std::move(transaction))); } @@ -70,7 +68,7 @@ UpdateProperties& UpdateProperties::Remove(const std::string& key) { return *this; } -Result UpdateProperties::Apply() { +Result UpdateProperties::Apply() { ICEBERG_RETURN_UNEXPECTED(CheckErrors()); const auto& current_props = transaction_->current().properties.configs(); std::unordered_map new_properties; @@ -111,27 +109,8 @@ Result UpdateProperties::Apply() { ICEBERG_RETURN_UNEXPECTED( MetricsConfig::VerifyReferencedColumns(new_properties, *schema.value())); } - - ApplyResult result; - if (!updates_.empty()) { - result.updates.emplace_back(std::make_unique(updates_)); - } - if (!removals_.empty()) { - for (const auto& key : removals_) { - if (current_props.contains(key)) { - removals.push_back(key); - } - } - if (!removals.empty()) { - result.updates.emplace_back(std::make_unique(removals)); - } - } - if (format_version_.has_value()) { - result.updates.emplace_back( - std::make_unique(format_version_.value())); - }; - - return result; + return ApplyResult{ + .updates = updates_, .removals = removals_, .format_version = format_version_}; } } // namespace iceberg diff --git a/src/iceberg/update/update_properties.h b/src/iceberg/update/update_properties.h index e0f578bea..ec9ab796e 100644 --- a/src/iceberg/update/update_properties.h +++ b/src/iceberg/update/update_properties.h @@ -26,9 +26,13 @@ #include #include "iceberg/iceberg_export.h" +#include "iceberg/result.h" #include "iceberg/type_fwd.h" #include "iceberg/update/pending_update.h" +/// \file iceberg/update/update_properties.h +/// \brief Updates table properties. + namespace iceberg { /// \brief Updates table properties. @@ -39,6 +43,12 @@ class ICEBERG_EXPORT UpdateProperties : public PendingUpdate { ~UpdateProperties() override; + struct ApplyResult { + std::unordered_map updates; + std::unordered_set removals; + std::optional format_version; + }; + /// \brief Sets a property key to a specified value. /// /// The key must not have been previously marked for removal and reserved property keys @@ -57,7 +67,8 @@ class ICEBERG_EXPORT UpdateProperties : public PendingUpdate { Kind kind() const final { return Kind::kUpdateProperties; } - Result Apply() final; + /// \brief Apply the pending changes and return the updates and removals. + Result Apply(); private: explicit UpdateProperties(std::shared_ptr transaction); diff --git a/src/iceberg/update/update_sort_order.cc b/src/iceberg/update/update_sort_order.cc new file mode 100644 index 000000000..c31d2ba74 --- /dev/null +++ b/src/iceberg/update/update_sort_order.cc @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/update_sort_order.h" + +#include +#include +#include + +#include "iceberg/expression/term.h" +#include "iceberg/result.h" +#include "iceberg/sort_order.h" +#include "iceberg/table_metadata.h" +#include "iceberg/transaction.h" +#include "iceberg/transform.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +Result> UpdateSortOrder::Make( + std::shared_ptr transaction) { + ICEBERG_PRECHECK(transaction != nullptr, + "Cannot create UpdateSortOrder without a transaction"); + return std::shared_ptr(new UpdateSortOrder(std::move(transaction))); +} + +UpdateSortOrder::UpdateSortOrder(std::shared_ptr transaction) + : PendingUpdate(std::move(transaction)) {} + +UpdateSortOrder::~UpdateSortOrder() = default; + +UpdateSortOrder& UpdateSortOrder::AddSortField(const std::shared_ptr& term, + SortDirection direction, + NullOrder null_order) { + ICEBERG_BUILDER_CHECK(term != nullptr, "Term cannot be null"); + ICEBERG_BUILDER_CHECK(term->is_unbound(), "Term must be unbound"); + + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto schema, transaction_->current().Schema()); + if (term->kind() == Term::Kind::kReference) { + // kReference is treated as identity transform + auto named_ref = internal::checked_pointer_cast(term); + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto bound_ref, + named_ref->Bind(*schema, case_sensitive_)); + sort_fields_.emplace_back(bound_ref->field_id(), Transform::Identity(), direction, + null_order); + } else if (term->kind() == Term::Kind::kTransform) { + auto unbound_transform = internal::checked_pointer_cast(term); + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto bound_term, + unbound_transform->Bind(*schema, case_sensitive_)); + sort_fields_.emplace_back(bound_term->reference()->field_id(), + unbound_transform->transform(), direction, null_order); + } else { + return AddError(ErrorKind::kNotSupported, "Not supported unbound term: {}", + static_cast(term->kind())); + } + + return *this; +} + +UpdateSortOrder& UpdateSortOrder::AddSortFieldByName(std::string_view name, + SortDirection direction, + NullOrder null_order) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto named_ref, + NamedReference::Make(std::string(name))); + return AddSortField(std::move(named_ref), direction, null_order); +} + +UpdateSortOrder& UpdateSortOrder::CaseSensitive(bool case_sensitive) { + case_sensitive_ = case_sensitive; + return *this; +} + +Result UpdateSortOrder::Apply() { + ICEBERG_RETURN_UNEXPECTED(CheckErrors()); + + // If no sort fields are specified, return an unsorted order (ID = 0). + std::shared_ptr order; + if (sort_fields_.empty()) { + order = SortOrder::Unsorted(); + } else { + // Use -1 as a placeholder for non-empty sort orders. + // The actual sort order ID will be assigned by TableMetadataBuilder when + // the AddSortOrder update is applied. + ICEBERG_ASSIGN_OR_RAISE(order, SortOrder::Make(/*sort_id=*/-1, sort_fields_)); + ICEBERG_ASSIGN_OR_RAISE(auto schema, transaction_->current().Schema()); + ICEBERG_RETURN_UNEXPECTED(order->Validate(*schema)); + } + return ApplyResult{std::move(order)}; +} + +} // namespace iceberg diff --git a/src/iceberg/update/update_sort_order.h b/src/iceberg/update/update_sort_order.h new file mode 100644 index 000000000..7983513b6 --- /dev/null +++ b/src/iceberg/update/update_sort_order.h @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include + +#include "iceberg/expression/term.h" +#include "iceberg/sort_field.h" +#include "iceberg/type_fwd.h" +#include "iceberg/update/pending_update.h" + +/// \file iceberg/update/update_sort_order.h +/// \brief Updates the table sort order. + +namespace iceberg { + +/// \brief Updating table sort order with a newly created order. +class ICEBERG_EXPORT UpdateSortOrder : public PendingUpdate { + public: + static Result> Make( + std::shared_ptr transaction); + + ~UpdateSortOrder() override; + + struct ApplyResult { + std::shared_ptr sort_order; + }; + + /// \brief Add a sort field to the sort order. + /// + /// \param term A transform term referencing the field + /// \param direction The sort direction (ascending or descending) + /// \param null_order The null order (first or last) + /// \return Reference to this UpdateSortOrder for chaining + UpdateSortOrder& AddSortField(const std::shared_ptr& term, + SortDirection direction, NullOrder null_order); + + /// \brief Add a sort field by field name with identity transform. + /// + /// \param name The name of the field to sort by + /// \param direction The sort direction (ascending or descending) + /// \param null_order The null order (first or last) + /// \return Reference to this UpdateSortOrder for chaining + UpdateSortOrder& AddSortFieldByName(std::string_view name, SortDirection direction, + NullOrder null_order); + + /// \brief Set case sensitivity of sort column name resolution. + /// + /// \param case_sensitive When true, column name resolution is case-sensitive + /// \return Reference to this UpdateSortOrder for chaining + UpdateSortOrder& CaseSensitive(bool case_sensitive); + + Kind kind() const final { return Kind::kUpdateSortOrder; } + + /// \brief Apply the pending changes and return the new SortOrder. + Result Apply(); + + private: + explicit UpdateSortOrder(std::shared_ptr transaction); + + std::vector sort_fields_; + bool case_sensitive_ = true; +}; + +} // namespace iceberg