diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index a0d93967f..37cbec1cf 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -75,6 +75,7 @@ set(ICEBERG_SOURCES transform.cc transform_function.cc type.cc + update/replace_sort_order.cc update/update_properties.cc util/bucket_util.cc util/conversions.cc diff --git a/src/iceberg/expression/term.h b/src/iceberg/expression/term.h index 5e834af51..b50ea9242 100644 --- a/src/iceberg/expression/term.h +++ b/src/iceberg/expression/term.h @@ -151,6 +151,8 @@ class ICEBERG_EXPORT BoundReference std::shared_ptr type() const override { return field_.type(); } + int32_t field_id() const { return field_.field_id(); } + bool MayProduceNull() const override { return field_.optional(); } bool Equals(const BoundTerm& other) const override; diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index d473d72e1..e255d063d 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -97,6 +97,7 @@ iceberg_sources = files( 'transform.cc', 'transform_function.cc', 'type.cc', + 'update/replace_sort_order.cc', 'update/update_properties.cc', 'util/bucket_util.cc', 'util/conversions.cc', @@ -196,7 +197,6 @@ install_headers( 'transform.h', 'type_fwd.h', 'type.h', - 'update/update_properties.h', ], subdir: 'iceberg', ) @@ -205,6 +205,7 @@ subdir('catalog') subdir('expression') subdir('manifest') subdir('row') +subdir('update') subdir('util') if get_option('tests').enabled() diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index 4e814e027..1c6f7817c 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -30,6 +30,7 @@ #include #include #include +#include #include @@ -758,6 +759,10 @@ TableMetadataBuilder& TableMetadataBuilder::RemoveEncryptionKey(std::string_view throw IcebergError(std::format("{} not implemented", __FUNCTION__)); } +const std::vector>& TableMetadataBuilder::Changes() const { + return impl_->changes; +} + Result> TableMetadataBuilder::Build() { // 1. Check for accumulated errors ICEBERG_RETURN_UNEXPECTED(CheckErrors()); diff --git a/src/iceberg/table_metadata.h b/src/iceberg/table_metadata.h index f428fd34f..ccc3b4f6e 100644 --- a/src/iceberg/table_metadata.h +++ b/src/iceberg/table_metadata.h @@ -419,6 +419,13 @@ class ICEBERG_EXPORT TableMetadataBuilder : public ErrorCollector { /// \return A Result containing the constructed TableMetadata or an error Result> Build(); + /// \brief Get the list of changes made to the table metadata + /// + /// \return A vector of unique pointers to TableUpdates representing the changes + /// \note We perform error checks and validate metadata consistency in Build(), so call + /// Build() before calling Changes(). + const std::vector>& Changes() const; + /// \brief Destructor ~TableMetadataBuilder() override; diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 2c4e0f512..f43dd7015 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -71,6 +71,7 @@ add_iceberg_test(table_test SOURCES json_internal_test.cc metrics_config_test.cc + replace_sort_order_test.cc schema_json_test.cc table_test.cc table_metadata_builder_test.cc diff --git a/src/iceberg/test/meson.build b/src/iceberg/test/meson.build index f12b43afa..d3a796c85 100644 --- a/src/iceberg/test/meson.build +++ b/src/iceberg/test/meson.build @@ -48,6 +48,7 @@ iceberg_tests = { 'sources': files( 'json_internal_test.cc', 'metrics_config_test.cc', + 'replace_sort_order_test.cc', 'schema_json_test.cc', 'table_metadata_builder_test.cc', 'table_requirement_test.cc', diff --git a/src/iceberg/test/replace_sort_order_test.cc b/src/iceberg/test/replace_sort_order_test.cc new file mode 100644 index 000000000..c27273166 --- /dev/null +++ b/src/iceberg/test/replace_sort_order_test.cc @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/replace_sort_order.h" + +#include +#include +#include + +#include +#include + +#include "iceberg/expression/term.h" +#include "iceberg/result.h" +#include "iceberg/schema.h" +#include "iceberg/schema_field.h" +#include "iceberg/sort_field.h" +#include "iceberg/table.h" +#include "iceberg/table_identifier.h" +#include "iceberg/table_metadata.h" +#include "iceberg/test/matchers.h" +#include "iceberg/test/mock_catalog.h" +#include "iceberg/transform.h" + +namespace iceberg { + +class ReplaceSortOrderTest : public ::testing::Test { + protected: + void SetUp() override { + // Create a simple schema with various field types + SchemaField field1(1, "id", std::make_shared(), false); + SchemaField field2(2, "name", std::make_shared(), true); + SchemaField field3(3, "ts", std::make_shared(), true); + SchemaField field4(4, "price", std::make_shared(10, 2), true); + schema_ = std::make_shared( + std::vector{field1, field2, field3, field4}, 1); + + // Create basic table metadata + metadata_ = std::make_shared(); + metadata_->schemas.push_back(schema_); + metadata_->current_schema_id = 1; + + // Create catalog and table identifier + catalog_ = std::make_shared(); + identifier_ = TableIdentifier(Namespace({"test"}), "table"); + } + + std::shared_ptr schema_; + std::shared_ptr metadata_; + std::shared_ptr catalog_; + TableIdentifier identifier_; +}; + +TEST_F(ReplaceSortOrderTest, AscendingSingleField) { + ReplaceSortOrder update(identifier_, catalog_, metadata_); + + auto ref = NamedReference::Make("id").value(); + auto term = UnboundTransform::Make(std::move(ref), Transform::Identity()).value(); + + update.AddSortField(std::move(term), SortDirection::kAscending, NullOrder::kFirst); + + auto result = update.Apply(); + EXPECT_THAT(result, IsOk()); + + auto sort_order = update.GetBuiltSortOrder(); + ASSERT_NE(sort_order, nullptr); + EXPECT_EQ(sort_order->fields().size(), 1); + + const auto& field = sort_order->fields()[0]; + EXPECT_EQ(field.source_id(), 1); + EXPECT_EQ(field.direction(), SortDirection::kAscending); + EXPECT_EQ(field.null_order(), NullOrder::kFirst); +} + +TEST_F(ReplaceSortOrderTest, DescendingSingleField) { + ReplaceSortOrder update(identifier_, catalog_, metadata_); + + auto ref = NamedReference::Make("name").value(); + auto term = UnboundTransform::Make(std::move(ref), Transform::Identity()).value(); + + update.AddSortField(std::move(term), SortDirection::kDescending, NullOrder::kLast); + + auto result = update.Apply(); + EXPECT_THAT(result, IsOk()); + + auto sort_order = update.GetBuiltSortOrder(); + ASSERT_NE(sort_order, nullptr); + EXPECT_EQ(sort_order->fields().size(), 1); + + const auto& field = sort_order->fields()[0]; + EXPECT_EQ(field.source_id(), 2); + EXPECT_EQ(field.direction(), SortDirection::kDescending); + EXPECT_EQ(field.null_order(), NullOrder::kLast); +} + +TEST_F(ReplaceSortOrderTest, MultipleFields) { + ReplaceSortOrder update(identifier_, catalog_, metadata_); + + auto ref1 = NamedReference::Make("name").value(); + auto term1 = UnboundTransform::Make(std::move(ref1), Transform::Identity()).value(); + + auto ref2 = NamedReference::Make("id").value(); + auto term2 = UnboundTransform::Make(std::move(ref2), Transform::Identity()).value(); + + update.AddSortField(std::move(term1), SortDirection::kAscending, NullOrder::kFirst) + .AddSortField(std::move(term2), SortDirection::kDescending, NullOrder::kLast); + + auto result = update.Apply(); + EXPECT_THAT(result, IsOk()); + + auto sort_order = update.GetBuiltSortOrder(); + ASSERT_NE(sort_order, nullptr); + EXPECT_EQ(sort_order->fields().size(), 2); + + // Check first field + const auto& field1 = sort_order->fields()[0]; + EXPECT_EQ(field1.source_id(), 2); // name field + EXPECT_EQ(field1.direction(), SortDirection::kAscending); + EXPECT_EQ(field1.null_order(), NullOrder::kFirst); + + // Check second field + const auto& field2 = sort_order->fields()[1]; + EXPECT_EQ(field2.source_id(), 1); // id field + EXPECT_EQ(field2.direction(), SortDirection::kDescending); + EXPECT_EQ(field2.null_order(), NullOrder::kLast); +} + +TEST_F(ReplaceSortOrderTest, WithTransform) { + ReplaceSortOrder update(identifier_, catalog_, metadata_); + + auto ref = NamedReference::Make("ts").value(); + auto term = UnboundTransform::Make(std::move(ref), Transform::Day()).value(); + + update.AddSortField(std::move(term), SortDirection::kAscending, NullOrder::kFirst); + + auto result = update.Apply(); + EXPECT_THAT(result, IsOk()); + + auto sort_order = update.GetBuiltSortOrder(); + ASSERT_NE(sort_order, nullptr); + EXPECT_EQ(sort_order->fields().size(), 1); + + const auto& field = sort_order->fields()[0]; + EXPECT_EQ(field.source_id(), 3); + EXPECT_EQ(field.transform()->ToString(), "day"); + EXPECT_EQ(field.direction(), SortDirection::kAscending); + EXPECT_EQ(field.null_order(), NullOrder::kFirst); +} + +TEST_F(ReplaceSortOrderTest, WithBucketTransform) { + ReplaceSortOrder update(identifier_, catalog_, metadata_); + + auto ref = NamedReference::Make("name").value(); + auto term = UnboundTransform::Make(std::move(ref), Transform::Bucket(10)).value(); + + update.AddSortField(std::move(term), SortDirection::kDescending, NullOrder::kLast); + + auto result = update.Apply(); + EXPECT_THAT(result, IsOk()); + + auto sort_order = update.GetBuiltSortOrder(); + ASSERT_NE(sort_order, nullptr); + EXPECT_EQ(sort_order->fields().size(), 1); + + const auto& field = sort_order->fields()[0]; + EXPECT_EQ(field.source_id(), 2); + EXPECT_EQ(field.transform()->ToString(), "bucket[10]"); + EXPECT_EQ(field.direction(), SortDirection::kDescending); + EXPECT_EQ(field.null_order(), NullOrder::kLast); +} + +TEST_F(ReplaceSortOrderTest, NullTerm) { + ReplaceSortOrder update(identifier_, catalog_, metadata_); + + update.AddSortField(nullptr, SortDirection::kAscending, NullOrder::kFirst); + + auto result = update.Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Term cannot be null")); +} + +TEST_F(ReplaceSortOrderTest, InvalidTransformForType) { + ReplaceSortOrder update(identifier_, catalog_, metadata_); + + // Try to apply day transform to a string field (invalid) + auto ref = NamedReference::Make("name").value(); + auto term = UnboundTransform::Make(std::move(ref), Transform::Day()).value(); + + update.AddSortField(std::move(term), SortDirection::kAscending, NullOrder::kFirst); + + auto result = update.Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("not a valid input type")); +} + +TEST_F(ReplaceSortOrderTest, NonExistentField) { + ReplaceSortOrder update(identifier_, catalog_, metadata_); + + auto ref = NamedReference::Make("nonexistent").value(); + auto term = UnboundTransform::Make(std::move(ref), Transform::Identity()).value(); + + update.AddSortField(std::move(term), SortDirection::kAscending, NullOrder::kFirst); + + auto result = update.Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Cannot find")); +} + +TEST_F(ReplaceSortOrderTest, CaseSensitiveTrue) { + ReplaceSortOrder update(identifier_, catalog_, metadata_); + + auto ref = NamedReference::Make("ID").value(); // Uppercase + auto term = UnboundTransform::Make(std::move(ref), Transform::Identity()).value(); + + update.CaseSensitive(true).AddSortField(std::move(term), SortDirection::kAscending, + NullOrder::kFirst); + + auto result = update.Apply(); + // Should fail because schema has "id" (lowercase) + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); +} + +TEST_F(ReplaceSortOrderTest, CaseSensitiveFalse) { + ReplaceSortOrder update(identifier_, catalog_, metadata_); + + auto ref = NamedReference::Make("ID").value(); // Uppercase + auto term = UnboundTransform::Make(std::move(ref), Transform::Identity()).value(); + + update.CaseSensitive(false).AddSortField(std::move(term), SortDirection::kAscending, + NullOrder::kFirst); + + auto result = update.Apply(); + // Should succeed because case-insensitive matching + EXPECT_THAT(result, IsOk()); + + auto sort_order = update.GetBuiltSortOrder(); + ASSERT_NE(sort_order, nullptr); + EXPECT_EQ(sort_order->fields().size(), 1); + EXPECT_EQ(sort_order->fields()[0].source_id(), 1); +} + +} // namespace iceberg diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 0e1867f60..5b7c616fc 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -160,9 +160,8 @@ class TableMetadataBuilder; class TableUpdateContext; class PendingUpdate; -template -class PendingUpdateTyped; class UpdateProperties; +class ReplaceSortOrder; /// ---------------------------------------------------------------------------- /// TODO: Forward declarations below are not added yet. diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build new file mode 100644 index 000000000..7134fa3d5 --- /dev/null +++ b/src/iceberg/update/meson.build @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +install_headers( + ['replace_sort_order.h', 'update_properties.h'], + subdir: 'iceberg/update', +) diff --git a/src/iceberg/update/replace_sort_order.cc b/src/iceberg/update/replace_sort_order.cc new file mode 100644 index 000000000..82cefb50a --- /dev/null +++ b/src/iceberg/update/replace_sort_order.cc @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/replace_sort_order.h" + +#include +#include +#include + +#include "iceberg/catalog.h" +#include "iceberg/expression/term.h" +#include "iceberg/result.h" +#include "iceberg/sort_field.h" +#include "iceberg/sort_order.h" +#include "iceberg/table.h" +#include "iceberg/table_identifier.h" +#include "iceberg/table_metadata.h" +#include "iceberg/table_requirements.h" +#include "iceberg/transform.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/error_collector.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +ReplaceSortOrder::ReplaceSortOrder(TableIdentifier identifier, + std::shared_ptr catalog, + std::shared_ptr base) + : identifier_(std::move(identifier)), + catalog_(std::move(catalog)), + base_metadata_(std::move(base)) {} + +ReplaceSortOrder& ReplaceSortOrder::AddSortField(std::shared_ptr term, + SortDirection direction, + NullOrder null_order) { + if (!term) { + return AddError(ErrorKind::kInvalidArgument, "Term cannot be null"); + } + if (term->kind() != Term::Kind::kTransform) { + return AddError(ErrorKind::kInvalidArgument, "Term must be a transform term"); + } + if (!term->is_unbound()) { + return AddError(ErrorKind::kInvalidArgument, "Term must be unbound"); + } + // use checked-cast to get UnboundTransform + auto unbound_transform = internal::checked_pointer_cast(term); + + BUILDER_ASSIGN_OR_RETURN(auto schema, GetSchema()); + BUILDER_ASSIGN_OR_RETURN(auto bound_term, + unbound_transform->Bind(*schema, case_sensitive_)); + + int32_t source_id = bound_term->reference()->field_id(); + sort_fields_.emplace_back(source_id, unbound_transform->transform(), direction, + null_order); + return *this; +} + +ReplaceSortOrder& ReplaceSortOrder::CaseSensitive(bool case_sensitive) { + case_sensitive_ = case_sensitive; + return *this; +} + +Status ReplaceSortOrder::Apply() { + ICEBERG_RETURN_UNEXPECTED(CheckErrors()); + // Note: We use kInitialSortOrderId (1) here like the Java implementation. + // The actual sort order ID will be assigned by TableMetadataBuilder when + // the AddSortOrder update is applied. + ICEBERG_ASSIGN_OR_RAISE(auto order, + SortOrder::Make(SortOrder::kInitialSortOrderId, sort_fields_)); + ICEBERG_ASSIGN_OR_RAISE(auto schema, GetSchema()); + ICEBERG_RETURN_UNEXPECTED(order->Validate(*schema)); + built_sort_order_ = std::move(order); + return {}; +} + +Status ReplaceSortOrder::Commit() { + ICEBERG_RETURN_UNEXPECTED(Apply()); + + // Use TableMetadataBuilder to generate the changes + auto builder = TableMetadataBuilder::BuildFrom(base_metadata_.get()); + builder->SetDefaultSortOrder(built_sort_order_); + + // Call Build() for error checks and validate metadata consistency. We simply ignore the + // returned Tablemetadata here, this may need to be optimized in the future. + ICEBERG_RETURN_UNEXPECTED(builder->Build()); + const auto& updates = builder->Changes(); + if (!updates.empty()) { + ICEBERG_ASSIGN_OR_RAISE(auto requirements, + TableRequirements::ForUpdateTable(*base_metadata_, updates)); + ICEBERG_RETURN_UNEXPECTED(catalog_->UpdateTable(identifier_, requirements, updates)); + } + + return {}; +} + +std::shared_ptr ReplaceSortOrder::GetBuiltSortOrder() const { + return built_sort_order_; +} + +Result> ReplaceSortOrder::GetSchema() { + if (!cached_schema_) { + ICEBERG_ASSIGN_OR_RAISE(cached_schema_, base_metadata_->Schema()); + } + return cached_schema_; +} + +} // namespace iceberg diff --git a/src/iceberg/update/replace_sort_order.h b/src/iceberg/update/replace_sort_order.h new file mode 100644 index 000000000..dcc24451d --- /dev/null +++ b/src/iceberg/update/replace_sort_order.h @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include + +#include "iceberg/expression/term.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/pending_update.h" +#include "iceberg/sort_field.h" +#include "iceberg/sort_order.h" +#include "iceberg/table_identifier.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Replacing table sort order with a newly created order. +class ICEBERG_EXPORT ReplaceSortOrder : public PendingUpdate { + public: + /// \brief Constructs a ReplaceSortOrder for the specified table. + /// + /// \param identifier The table identifier + /// \param catalog The catalog containing the table + /// \param metadata The current table metadata + ReplaceSortOrder(TableIdentifier identifier, std::shared_ptr catalog, + std::shared_ptr base); + + /// \brief Add a sort field to the sort order. + /// + /// \param term A transform term referencing the field + /// \param direction The sort direction (ascending or descending) + /// \param null_order The null order (first or last) + /// \return Reference to this ReplaceSortOrder for chaining + ReplaceSortOrder& AddSortField(std::shared_ptr term, SortDirection direction, + NullOrder null_order); + + /// \brief Set case sensitivity of sort column name resolution. + /// + /// \param case_sensitive When true, column name resolution is case-sensitive + /// \return Reference to this ReplaceSortOrder for chaining + ReplaceSortOrder& CaseSensitive(bool case_sensitive); + + /// \brief Applies the sort order changes without committing them. + /// + /// Validates the pending sort order changes but does not commit them to the table. + /// This method can be used to validate changes before actually committing them. + /// + /// \return Status::OK if the changes are valid, or an error if validation fails + Status Apply() override; + + /// \brief Commits the sort order changes to the table. + /// + /// Validates the changes and applies them to the table through the catalog. + /// + /// \return Status::OK if the changes are valid and committed successfully, or an error + Status Commit() override; + + /// \brief Get the built sort order after applying changes. + /// + /// \return The built SortOrder object. + std::shared_ptr GetBuiltSortOrder() const; + + private: + /// \brief Get the schema, loading and caching it on first access. + Result> GetSchema(); + + TableIdentifier identifier_; + std::shared_ptr catalog_; + std::shared_ptr base_metadata_; + + std::vector sort_fields_; + bool case_sensitive_ = true; + std::shared_ptr built_sort_order_; + + // Cached schema to avoid repeated lookups + std::shared_ptr cached_schema_; +}; + +} // namespace iceberg