diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 0579c67d2..d116b9cdb 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -77,6 +77,7 @@ set(ICEBERG_SOURCES transform_function.cc type.cc update/pending_update.cc + update/update_partition_spec.cc update/update_properties.cc util/bucket_util.cc util/conversions.cc diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 850f65905..da24e7256 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -99,6 +99,7 @@ iceberg_sources = files( 'transform_function.cc', 'type.cc', 'update/pending_update.cc', + 'update/update_partition_spec.cc', 'update/update_properties.cc', 'util/bucket_util.cc', 'util/conversions.cc', diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index b5a1e582e..aa5e953fc 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -19,6 +19,8 @@ #include "iceberg/table.h" +#include + #include "iceberg/catalog.h" #include "iceberg/partition_spec.h" #include "iceberg/result.h" @@ -28,6 +30,7 @@ #include "iceberg/table_properties.h" #include "iceberg/table_scan.h" #include "iceberg/transaction.h" +#include "iceberg/update/update_partition_spec.h" #include "iceberg/update/update_properties.h" #include "iceberg/util/macros.h" @@ -154,6 +157,13 @@ Result> Table::NewUpdateProperties() { return transaction->NewUpdateProperties(); } +Result> Table::NewUpdateSpec() { + ICEBERG_ASSIGN_OR_RAISE( + auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate, + /*auto_commit=*/true)); + return transaction->NewUpdateSpec(); +} + Result> StagedTable::Make( TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, diff --git a/src/iceberg/table.h b/src/iceberg/table.h index efe175828..788441212 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -132,6 +132,10 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this { /// changes. virtual Result> NewUpdateProperties(); + /// \brief Create a new UpdatePartitionSpec to update the partition spec of this table + /// and commit the changes. + virtual Result> NewUpdateSpec(); + protected: Table(TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 28178b883..145e01b80 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -153,6 +153,7 @@ if(ICEBERG_BUILD_BUNDLE) USE_BUNDLE SOURCES transaction_test.cc + update_partition_spec_test.cc update_properties_test.cc) endif() diff --git a/src/iceberg/test/update_partition_spec_test.cc b/src/iceberg/test/update_partition_spec_test.cc new file mode 100644 index 000000000..536b9e26d --- /dev/null +++ b/src/iceberg/test/update_partition_spec_test.cc @@ -0,0 +1,807 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/update_partition_spec.h" + +#include +#include +#include +#include + +#include +#include +#include + +#include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/catalog/memory/in_memory_catalog.h" +#include "iceberg/expression/expressions.h" +#include "iceberg/partition_spec.h" +#include "iceberg/schema.h" +#include "iceberg/snapshot.h" +#include "iceberg/sort_order.h" +#include "iceberg/table.h" +#include "iceberg/table_identifier.h" +#include "iceberg/table_metadata.h" +#include "iceberg/table_update.h" +#include "iceberg/test/matchers.h" +#include "iceberg/transaction.h" +#include "iceberg/transform.h" +#include "iceberg/type.h" +#include "iceberg/util/uuid.h" + +namespace iceberg { + +namespace { + +// Helper to assert partition spec equality +void AssertPartitionSpecEquals(const PartitionSpec& expected, + const PartitionSpec& actual) { + ASSERT_EQ(expected.fields().size(), actual.fields().size()); + for (size_t i = 0; i < expected.fields().size(); ++i) { + const auto& expected_field = expected.fields()[i]; + const auto& actual_field = actual.fields()[i]; + EXPECT_EQ(expected_field.source_id(), actual_field.source_id()); + EXPECT_EQ(expected_field.field_id(), actual_field.field_id()); + EXPECT_EQ(expected_field.name(), actual_field.name()); + EXPECT_EQ(*expected_field.transform(), *actual_field.transform()); + } +} + +// Helper to create base metadata with a specific partition spec +std::unique_ptr CreateBaseMetadata(int8_t format_version, + std::shared_ptr schema, + std::shared_ptr spec) { + auto metadata = std::make_unique(); + metadata->format_version = format_version; + metadata->table_uuid = "test-uuid-1234"; + metadata->location = "/warehouse/test_table"; + metadata->last_sequence_number = 0; + metadata->last_updated_ms = TimePointMs{std::chrono::milliseconds(1000)}; + metadata->last_column_id = 4; + metadata->current_schema_id = 0; + metadata->schemas.push_back(std::move(schema)); + metadata->default_spec_id = spec->spec_id(); + metadata->last_partition_id = spec->last_assigned_field_id(); + metadata->current_snapshot_id = Snapshot::kInvalidSnapshotId; + metadata->default_sort_order_id = SortOrder::kInitialSortOrderId; + metadata->sort_orders.push_back(SortOrder::Unsorted()); + metadata->next_row_id = TableMetadata::kInitialRowId; + metadata->properties = TableProperties::default_properties(); + metadata->partition_specs.push_back(std::move(spec)); + return metadata; +} + +// Helper to extract spec from ApplyResult +Result> GetAppliedSpec( + const PendingUpdate::ApplyResult& result) { + if (result.updates.empty()) { + return InvalidArgument("No updates in ApplyResult"); + } + auto* add_spec = dynamic_cast(result.updates[0].get()); + if (add_spec == nullptr) { + return InvalidArgument("First update is not AddPartitionSpec"); + } + return add_spec->spec(); +} + +} // namespace + +class UpdatePartitionSpecTest : public ::testing::TestWithParam { + protected: + void SetUp() override { + file_io_ = arrow::ArrowFileSystemFileIO::MakeMockFileIO(); + catalog_ = InMemoryCatalog::Make("test_catalog", file_io_, "/warehouse/", {}); + format_version_ = GetParam(); + test_schema_ = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int64()), + SchemaField::MakeRequired(2, "ts", timestamp_tz()), + SchemaField::MakeRequired(3, "category", string()), + SchemaField::MakeOptional(4, "data", string())}, + 0); + + // Create unpartitioned and partitioned specs matching Java test + ICEBERG_UNWRAP_OR_FAIL( + auto unpartitioned_spec, + PartitionSpec::Make(PartitionSpec::kInitialSpecId, std::vector{}, + PartitionSpec::kLegacyPartitionDataIdStart - 1)); + ICEBERG_UNWRAP_OR_FAIL( + partitioned_spec_, + PartitionSpec::Make( + PartitionSpec::kInitialSpecId, + std::vector{ + PartitionField(3, 1000, "category", Transform::Identity()), + PartitionField(2, 1001, "ts_day", Transform::Day()), + PartitionField(1, 1002, "shard", Transform::Bucket(16))}, + 1002)); + + auto partitioned_metadata = + CreateBaseMetadata(format_version_, test_schema_, partitioned_spec_); + auto unpartitioned_metadata = + CreateBaseMetadata(format_version_, test_schema_, std::move(unpartitioned_spec)); + + // Write metadata files + partitioned_metadata->location = partitioned_table_location_; + unpartitioned_metadata->location = unpartitioned_table_location_; + + // Arrow MockFS cannot automatically create directories. + auto arrow_fs = std::dynamic_pointer_cast<::arrow::fs::internal::MockFileSystem>( + static_cast(*file_io_).fs()); + ASSERT_TRUE(arrow_fs != nullptr); + ASSERT_TRUE(arrow_fs->CreateDir(partitioned_table_location_ + "/metadata").ok()); + ASSERT_TRUE(arrow_fs->CreateDir(unpartitioned_table_location_ + "/metadata").ok()); + + // Write table metadata to the table location. + std::string partitioned_metadata_location = + std::format("{}/metadata/00001-{}.metadata.json", partitioned_table_location_, + Uuid::GenerateV7().ToString()); + std::string unpartitioned_metadata_location = + std::format("{}/metadata/00001-{}.metadata.json", unpartitioned_table_location_, + Uuid::GenerateV7().ToString()); + + ASSERT_THAT(TableMetadataUtil::Write(*file_io_, partitioned_metadata_location, + *partitioned_metadata), + IsOk()); + ASSERT_THAT(TableMetadataUtil::Write(*file_io_, unpartitioned_metadata_location, + *unpartitioned_metadata), + IsOk()); + + // Register the tables in the catalog. + ICEBERG_UNWRAP_OR_FAIL( + partitioned_table_, + catalog_->RegisterTable(partitioned_table_ident_, partitioned_metadata_location)); + ICEBERG_UNWRAP_OR_FAIL(unpartitioned_table_, + catalog_->RegisterTable(unpartitioned_table_ident_, + unpartitioned_metadata_location)); + } + + // Helper to create UpdatePartitionSpec with a specific partition spec + std::shared_ptr CreateUpdatePartitionSpec(bool partitioned) { + if (partitioned) { + return partitioned_table_->NewUpdateSpec().value(); + } else { + return unpartitioned_table_->NewUpdateSpec().value(); + } + } + + // Helper to create an expected partition spec + std::shared_ptr MakeExpectedSpec( + const std::vector& fields, int32_t last_assigned_field_id) { + auto spec_result = PartitionSpec::Make(PartitionSpec::kInitialSpecId, fields, + last_assigned_field_id); + if (!spec_result.has_value()) { + ADD_FAILURE() << "Failed to create expected spec: " << spec_result.error().message; + return nullptr; + } + return std::shared_ptr(spec_result.value().release()); + } + + // Helper to apply update and get the resulting spec + std::shared_ptr ApplyUpdateAndGetSpec( + std::shared_ptr update) { + auto result = update->Apply(); + if (!result.has_value()) { + ADD_FAILURE() << "Failed to apply update: " << result.error().message; + return nullptr; + } + auto spec_result = GetAppliedSpec(result.value()); + if (!spec_result.has_value()) { + ADD_FAILURE() << "Failed to get applied spec: " << spec_result.error().message; + return nullptr; + } + return spec_result.value(); + } + + // Helper to apply update and assert spec equality + void ApplyUpdateAndAssertSpec(std::shared_ptr update, + const std::vector& expected_fields, + int32_t last_assigned_field_id) { + auto updated_spec = ApplyUpdateAndGetSpec(update); + auto expected_spec = MakeExpectedSpec(expected_fields, last_assigned_field_id); + AssertPartitionSpecEquals(*expected_spec, *updated_spec); + } + + // Helper to expect an error with a specific message + void ExpectError(std::shared_ptr update, ErrorKind expected_kind, + const std::string& expected_message) { + auto result = update->Apply(); + ASSERT_THAT(result, IsError(expected_kind)); + ASSERT_THAT(result, HasErrorMessage(expected_message)); + } + + // Helper to create a table with a custom partition spec + std::shared_ptr
CreateTableWithSpec(std::shared_ptr spec, + const std::string& table_name) { + auto metadata = CreateBaseMetadata(format_version_, test_schema_, spec); + TableIdentifier identifier{.ns = Namespace{.levels = {"test"}}, .name = table_name}; + std::string metadata_location = + std::format("/warehouse/{}/metadata/00000-{}.metadata.json", table_name, + Uuid::GenerateV7().ToString()); + auto table_result = + Table::Make(identifier, std::shared_ptr(metadata.release()), + metadata_location, file_io_, catalog_); + if (!table_result.has_value()) { + ADD_FAILURE() << "Failed to create table: " << table_result.error().message; + return nullptr; + } + return table_result.value(); + } + + // Helper to create UpdatePartitionSpec from a table + std::shared_ptr CreateUpdateFromTable( + std::shared_ptr
table) { + auto transaction_result = + Transaction::Make(table, Transaction::Kind::kUpdate, /*auto_commit=*/false); + if (!transaction_result.has_value()) { + ADD_FAILURE() << "Failed to create transaction: " + << transaction_result.error().message; + return nullptr; + } + auto update_result = UpdatePartitionSpec::Make(transaction_result.value()); + if (!update_result.has_value()) { + ADD_FAILURE() << "Failed to create UpdatePartitionSpec: " + << update_result.error().message; + return nullptr; + } + return update_result.value(); + } + + const TableIdentifier partitioned_table_ident_{.name = "partitioned_table"}; + const TableIdentifier unpartitioned_table_ident_{.name = "unpartitioned_table"}; + const std::string partitioned_table_location_{"/warehouse/partitioned_table"}; + const std::string unpartitioned_table_location_{"/warehouse/unpartitioned_table"}; + std::shared_ptr file_io_; + std::shared_ptr catalog_; + std::shared_ptr test_schema_; + std::shared_ptr partitioned_spec_; + std::shared_ptr
partitioned_table_; + std::shared_ptr
unpartitioned_table_; + int8_t format_version_; +}; + +INSTANTIATE_TEST_SUITE_P(FormatVersions, UpdatePartitionSpecTest, ::testing::Values(1, 2), + [](const ::testing::TestParamInfo& info) { + return std::format("V{}", info.param); + }); + +TEST_P(UpdatePartitionSpecTest, TestAddIdentityByName) { + auto update = CreateUpdatePartitionSpec(false); + update->AddField("category"); + ApplyUpdateAndAssertSpec( + update, {PartitionField(3, 1000, "category", Transform::Identity())}, 1000); +} + +TEST_P(UpdatePartitionSpecTest, TestAddIdentityByTerm) { + auto update = CreateUpdatePartitionSpec(false); + update->AddField(Expressions::Ref("category")); + ApplyUpdateAndAssertSpec( + update, {PartitionField(3, 1000, "category", Transform::Identity())}, 1000); +} + +TEST_P(UpdatePartitionSpecTest, TestAddYear) { + auto update = CreateUpdatePartitionSpec(false); + update->AddField(Expressions::Year("ts")); + ApplyUpdateAndAssertSpec(update, + {PartitionField(2, 1000, "ts_year", Transform::Year())}, 1000); +} + +TEST_P(UpdatePartitionSpecTest, TestAddMonth) { + auto update = CreateUpdatePartitionSpec(false); + update->AddField(Expressions::Month("ts")); + ApplyUpdateAndAssertSpec( + update, {PartitionField(2, 1000, "ts_month", Transform::Month())}, 1000); +} + +TEST_P(UpdatePartitionSpecTest, TestAddDay) { + auto update = CreateUpdatePartitionSpec(false); + update->AddField(Expressions::Day("ts")); + ApplyUpdateAndAssertSpec(update, {PartitionField(2, 1000, "ts_day", Transform::Day())}, + 1000); +} + +TEST_P(UpdatePartitionSpecTest, TestAddHour) { + auto update = CreateUpdatePartitionSpec(false); + update->AddField(Expressions::Hour("ts")); + ApplyUpdateAndAssertSpec(update, + {PartitionField(2, 1000, "ts_hour", Transform::Hour())}, 1000); +} + +TEST_P(UpdatePartitionSpecTest, TestAddBucket) { + auto update = CreateUpdatePartitionSpec(false); + update->AddField(Expressions::Bucket("id", 16)); + ApplyUpdateAndAssertSpec( + update, {PartitionField(1, 1000, "id_bucket_16", Transform::Bucket(16))}, 1000); +} + +TEST_P(UpdatePartitionSpecTest, TestAddTruncate) { + auto update = CreateUpdatePartitionSpec(false); + update->AddField(Expressions::Truncate("data", 4)); + ApplyUpdateAndAssertSpec( + update, {PartitionField(4, 1000, "data_trunc_4", Transform::Truncate(4))}, 1000); +} + +TEST_P(UpdatePartitionSpecTest, TestAddNamedPartition) { + auto update = CreateUpdatePartitionSpec(false); + update->AddField(Expressions::Bucket("id", 16), "shard"); + ApplyUpdateAndAssertSpec( + update, {PartitionField(1, 1000, "shard", Transform::Bucket(16))}, 1000); +} + +TEST_P(UpdatePartitionSpecTest, TestAddToExisting) { + auto update = CreateUpdatePartitionSpec(true); + update->AddField(Expressions::Truncate("data", 4)); + ApplyUpdateAndAssertSpec( + update, + {PartitionField(3, 1000, "category", Transform::Identity()), + PartitionField(2, 1001, "ts_day", Transform::Day()), + PartitionField(1, 1002, "shard", Transform::Bucket(16)), + PartitionField(4, 1003, "data_trunc_4", Transform::Truncate(4))}, + 1003); +} + +TEST_P(UpdatePartitionSpecTest, TestMultipleAdds) { + auto update = CreateUpdatePartitionSpec(false); + update->AddField("category") + .AddField(Expressions::Day("ts")) + .AddField(Expressions::Bucket("id", 16), "shard") + .AddField(Expressions::Truncate("data", 4), "prefix"); + ApplyUpdateAndAssertSpec(update, + {PartitionField(3, 1000, "category", Transform::Identity()), + PartitionField(2, 1001, "ts_day", Transform::Day()), + PartitionField(1, 1002, "shard", Transform::Bucket(16)), + PartitionField(4, 1003, "prefix", Transform::Truncate(4))}, + 1003); +} + +TEST_P(UpdatePartitionSpecTest, TestAddHourToDay) { + // First add day partition + auto update1 = CreateUpdatePartitionSpec(false); + update1->AddField(Expressions::Day("ts")); + auto by_day_spec = ApplyUpdateAndGetSpec(update1); + + // Then add hour partition + auto table = CreateTableWithSpec(by_day_spec, "test_table"); + auto update2 = CreateUpdateFromTable(table); + update2->AddField(Expressions::Hour("ts")); + auto by_hour_spec = ApplyUpdateAndGetSpec(update2); + + ASSERT_EQ(by_hour_spec->fields().size(), 2); + EXPECT_EQ(by_hour_spec->fields()[0].source_id(), 2); + EXPECT_EQ(by_hour_spec->fields()[0].name(), "ts_day"); + EXPECT_EQ(*by_hour_spec->fields()[0].transform(), *Transform::Day()); + EXPECT_EQ(by_hour_spec->fields()[1].source_id(), 2); + EXPECT_EQ(by_hour_spec->fields()[1].name(), "ts_hour"); + EXPECT_EQ(*by_hour_spec->fields()[1].transform(), *Transform::Hour()); +} + +TEST_P(UpdatePartitionSpecTest, TestAddMultipleBuckets) { + // First add bucket 16 + auto update1 = CreateUpdatePartitionSpec(false); + update1->AddField(Expressions::Bucket("id", 16)); + auto bucket16_spec = ApplyUpdateAndGetSpec(update1); + + // Then add bucket 8 + auto table = CreateTableWithSpec(bucket16_spec, "test_table"); + auto update2 = CreateUpdateFromTable(table); + update2->AddField(Expressions::Bucket("id", 8)); + ApplyUpdateAndAssertSpec( + update2, + {PartitionField(1, 1000, "id_bucket_16", Transform::Bucket(16)), + PartitionField(1, 1001, "id_bucket_8", Transform::Bucket(8))}, + 1001); +} + +TEST_P(UpdatePartitionSpecTest, TestRemoveIdentityByName) { + auto update = CreateUpdatePartitionSpec(true); + update->RemoveField("category"); + auto updated_spec = ApplyUpdateAndGetSpec(update); + if (format_version_ == 1) { + auto expected = + MakeExpectedSpec({PartitionField(3, 1000, "category", Transform::Void()), + PartitionField(2, 1001, "ts_day", Transform::Day()), + PartitionField(1, 1002, "shard", Transform::Bucket(16))}, + 1002); + AssertPartitionSpecEquals(*expected, *updated_spec); + } else { + auto expected = + MakeExpectedSpec({PartitionField(2, 1001, "ts_day", Transform::Day()), + PartitionField(1, 1002, "shard", Transform::Bucket(16))}, + 1002); + AssertPartitionSpecEquals(*expected, *updated_spec); + } +} + +TEST_P(UpdatePartitionSpecTest, TestRemoveBucketByName) { + auto update = CreateUpdatePartitionSpec(true); + update->RemoveField("shard"); + auto updated_spec = ApplyUpdateAndGetSpec(update); + if (format_version_ == 1) { + auto expected = + MakeExpectedSpec({PartitionField(3, 1000, "category", Transform::Identity()), + PartitionField(2, 1001, "ts_day", Transform::Day()), + PartitionField(1, 1002, "shard", Transform::Void())}, + 1002); + AssertPartitionSpecEquals(*expected, *updated_spec); + } else { + auto expected = + MakeExpectedSpec({PartitionField(3, 1000, "category", Transform::Identity()), + PartitionField(2, 1001, "ts_day", Transform::Day())}, + 1001); + AssertPartitionSpecEquals(*expected, *updated_spec); + } +} + +TEST_P(UpdatePartitionSpecTest, TestRemoveIdentityByEquivalent) { + auto update = CreateUpdatePartitionSpec(true); + update->RemoveField(Expressions::Ref("category")); + auto updated_spec = ApplyUpdateAndGetSpec(update); + if (format_version_ == 1) { + auto expected = + MakeExpectedSpec({PartitionField(3, 1000, "category", Transform::Void()), + PartitionField(2, 1001, "ts_day", Transform::Day()), + PartitionField(1, 1002, "shard", Transform::Bucket(16))}, + 1002); + AssertPartitionSpecEquals(*expected, *updated_spec); + } else { + auto expected = + MakeExpectedSpec({PartitionField(2, 1001, "ts_day", Transform::Day()), + PartitionField(1, 1002, "shard", Transform::Bucket(16))}, + 1002); + AssertPartitionSpecEquals(*expected, *updated_spec); + } +} + +TEST_P(UpdatePartitionSpecTest, TestRemoveDayByEquivalent) { + auto update = CreateUpdatePartitionSpec(true); + update->RemoveField(Expressions::Day("ts")); + auto updated_spec = ApplyUpdateAndGetSpec(update); + if (format_version_ == 1) { + auto expected = + MakeExpectedSpec({PartitionField(3, 1000, "category", Transform::Identity()), + PartitionField(2, 1001, "ts_day", Transform::Void()), + PartitionField(1, 1002, "shard", Transform::Bucket(16))}, + 1002); + AssertPartitionSpecEquals(*expected, *updated_spec); + } else { + auto expected = + MakeExpectedSpec({PartitionField(3, 1000, "category", Transform::Identity()), + PartitionField(1, 1002, "shard", Transform::Bucket(16))}, + 1002); + AssertPartitionSpecEquals(*expected, *updated_spec); + } +} + +TEST_P(UpdatePartitionSpecTest, TestRemoveBucketByEquivalent) { + auto update = CreateUpdatePartitionSpec(true); + update->RemoveField(Expressions::Bucket("id", 16)); + auto updated_spec = ApplyUpdateAndGetSpec(update); + if (format_version_ == 1) { + auto expected = + MakeExpectedSpec({PartitionField(3, 1000, "category", Transform::Identity()), + PartitionField(2, 1001, "ts_day", Transform::Day()), + PartitionField(1, 1002, "shard", Transform::Void())}, + 1002); + AssertPartitionSpecEquals(*expected, *updated_spec); + } else { + auto expected = + MakeExpectedSpec({PartitionField(3, 1000, "category", Transform::Identity()), + PartitionField(2, 1001, "ts_day", Transform::Day())}, + 1001); + AssertPartitionSpecEquals(*expected, *updated_spec); + } +} + +TEST_P(UpdatePartitionSpecTest, TestRename) { + auto update = CreateUpdatePartitionSpec(true); + update->RenameField("shard", "id_bucket"); + ApplyUpdateAndAssertSpec(update, + {PartitionField(3, 1000, "category", Transform::Identity()), + PartitionField(2, 1001, "ts_day", Transform::Day()), + PartitionField(1, 1002, "id_bucket", Transform::Bucket(16))}, + 1002); +} + +TEST_P(UpdatePartitionSpecTest, TestMultipleChanges) { + auto update = CreateUpdatePartitionSpec(true); + update->RenameField("shard", "id_bucket") + .RemoveField(Expressions::Day("ts")) + .AddField(Expressions::Truncate("data", 4), "prefix"); + auto updated_spec = ApplyUpdateAndGetSpec(update); + if (format_version_ == 1) { + auto expected = + MakeExpectedSpec({PartitionField(3, 1000, "category", Transform::Identity()), + PartitionField(2, 1001, "ts_day", Transform::Void()), + PartitionField(1, 1002, "id_bucket", Transform::Bucket(16)), + PartitionField(4, 1003, "prefix", Transform::Truncate(4))}, + 1003); + AssertPartitionSpecEquals(*expected, *updated_spec); + } else { + auto expected = + MakeExpectedSpec({PartitionField(3, 1000, "category", Transform::Identity()), + PartitionField(1, 1002, "id_bucket", Transform::Bucket(16)), + PartitionField(4, 1003, "prefix", Transform::Truncate(4))}, + 1003); + AssertPartitionSpecEquals(*expected, *updated_spec); + } +} + +TEST_P(UpdatePartitionSpecTest, TestAddDeletedName) { + auto update = CreateUpdatePartitionSpec(true); + update->RemoveField(Expressions::Bucket("id", 16)); + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + + ICEBERG_UNWRAP_OR_FAIL(auto updated_spec, GetAppliedSpec(result)); + + if (format_version_ == 1) { + ICEBERG_UNWRAP_OR_FAIL( + auto expected_spec, + PartitionSpec::Make( + PartitionSpec::kInitialSpecId, + std::vector{ + PartitionField(3, 1000, "category", Transform::Identity()), + PartitionField(2, 1001, "ts_day", Transform::Day()), + PartitionField(1, 1002, "shard", Transform::Void())}, + 1002)); + auto expected = std::shared_ptr(expected_spec.release()); + AssertPartitionSpecEquals(*expected, *updated_spec); + } else { + ICEBERG_UNWRAP_OR_FAIL( + auto expected_spec, + PartitionSpec::Make( + PartitionSpec::kInitialSpecId, + std::vector{ + PartitionField(3, 1000, "category", Transform::Identity()), + PartitionField(2, 1001, "ts_day", Transform::Day())}, + 1001)); + auto expected = std::shared_ptr(expected_spec.release()); + AssertPartitionSpecEquals(*expected, *updated_spec); + } +} + +TEST_P(UpdatePartitionSpecTest, TestRemoveNewlyAddedFieldByName) { + auto update = CreateUpdatePartitionSpec(true); + update->AddField(Expressions::Truncate("data", 4), "prefix"); + update->RemoveField("prefix"); + ExpectError(update, ErrorKind::kValidationFailed, "Cannot delete newly added field"); +} + +TEST_P(UpdatePartitionSpecTest, TestRemoveNewlyAddedFieldByTransform) { + auto update = CreateUpdatePartitionSpec(true); + update->AddField(Expressions::Truncate("data", 4), "prefix"); + update->RemoveField(Expressions::Truncate("data", 4)); + ExpectError(update, ErrorKind::kValidationFailed, "Cannot delete newly added field"); +} + +TEST_P(UpdatePartitionSpecTest, TestAddAlreadyAddedFieldByTransform) { + auto update = CreateUpdatePartitionSpec(true); + update->AddField(Expressions::Truncate("data", 4), "prefix"); + update->AddField(Expressions::Truncate("data", 4)); + ExpectError(update, ErrorKind::kValidationFailed, + "Cannot add duplicate partition field"); +} + +TEST_P(UpdatePartitionSpecTest, TestAddAlreadyAddedFieldByName) { + auto update = CreateUpdatePartitionSpec(true); + update->AddField(Expressions::Truncate("data", 4), "prefix"); + update->AddField(Expressions::Truncate("data", 6), "prefix"); + ExpectError(update, ErrorKind::kValidationFailed, + "Cannot add duplicate partition field"); +} + +TEST_P(UpdatePartitionSpecTest, TestAddRedundantTimePartition) { + // Test day + hour conflict + auto update1 = CreateUpdatePartitionSpec(false); + update1->AddField(Expressions::Day("ts")); + update1->AddField(Expressions::Hour("ts")); + ExpectError(update1, ErrorKind::kValidationFailed, + "Cannot add redundant partition field"); + + // Test hour + month conflict after adding hour to existing day + auto update2 = CreateUpdatePartitionSpec(true); + update2->AddField(Expressions::Hour("ts")); // day already exists, so hour is OK + update2->AddField(Expressions::Month("ts")); // conflicts with hour + ExpectError(update2, ErrorKind::kValidationFailed, + "Cannot add redundant partition field"); +} + +TEST_P(UpdatePartitionSpecTest, TestNoEffectAddDeletedSameFieldWithSameName) { + auto update1 = CreateUpdatePartitionSpec(true); + update1->RemoveField("shard"); + update1->AddField(Expressions::Bucket("id", 16), "shard"); + ICEBERG_UNWRAP_OR_FAIL(auto result1, update1->Apply()); + ICEBERG_UNWRAP_OR_FAIL(auto spec1, GetAppliedSpec(result1)); + AssertPartitionSpecEquals(*partitioned_spec_, *spec1); + + auto update2 = CreateUpdatePartitionSpec(true); + update2->RemoveField("shard"); + update2->AddField(Expressions::Bucket("id", 16)); + ICEBERG_UNWRAP_OR_FAIL(auto result2, update2->Apply()); + ICEBERG_UNWRAP_OR_FAIL(auto spec2, GetAppliedSpec(result2)); + AssertPartitionSpecEquals(*partitioned_spec_, *spec2); +} + +TEST_P(UpdatePartitionSpecTest, TestGenerateNewSpecAddDeletedSameFieldWithDifferentName) { + auto update = CreateUpdatePartitionSpec(true); + update->RemoveField("shard"); + update->AddField(Expressions::Bucket("id", 16), "new_shard"); + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + + ICEBERG_UNWRAP_OR_FAIL(auto updated_spec, GetAppliedSpec(result)); + + ASSERT_EQ(updated_spec->fields().size(), 3); + EXPECT_EQ(updated_spec->fields()[0].name(), "category"); + EXPECT_EQ(updated_spec->fields()[1].name(), "ts_day"); + EXPECT_EQ(updated_spec->fields()[2].name(), "new_shard"); + EXPECT_EQ(*updated_spec->fields()[0].transform(), *Transform::Identity()); + EXPECT_EQ(*updated_spec->fields()[1].transform(), *Transform::Day()); + EXPECT_EQ(*updated_spec->fields()[2].transform(), *Transform::Bucket(16)); +} + +TEST_P(UpdatePartitionSpecTest, TestAddDuplicateByName) { + auto update = CreateUpdatePartitionSpec(true); + update->AddField("category"); + ExpectError(update, ErrorKind::kValidationFailed, + "Cannot add duplicate partition field"); +} + +TEST_P(UpdatePartitionSpecTest, TestAddDuplicateByRef) { + auto update = CreateUpdatePartitionSpec(true); + update->AddField(Expressions::Ref("category")); + ExpectError(update, ErrorKind::kValidationFailed, + "Cannot add duplicate partition field"); +} + +TEST_P(UpdatePartitionSpecTest, TestAddDuplicateTransform) { + auto update = CreateUpdatePartitionSpec(true); + update->AddField(Expressions::Bucket("id", 16)); + ExpectError(update, ErrorKind::kValidationFailed, + "Cannot add duplicate partition field"); +} + +TEST_P(UpdatePartitionSpecTest, TestAddNamedDuplicate) { + auto update = CreateUpdatePartitionSpec(true); + update->AddField(Expressions::Bucket("id", 16), "b16"); + ExpectError(update, ErrorKind::kValidationFailed, + "Cannot add duplicate partition field"); +} + +TEST_P(UpdatePartitionSpecTest, TestRemoveUnknownFieldByName) { + auto update = CreateUpdatePartitionSpec(true); + update->RemoveField("moon"); + ExpectError(update, ErrorKind::kValidationFailed, + "Cannot find partition field to remove"); +} + +TEST_P(UpdatePartitionSpecTest, TestRemoveUnknownFieldByEquivalent) { + auto update = CreateUpdatePartitionSpec(true); + update->RemoveField(Expressions::Hour("ts")); // day(ts) exists, not hour + ExpectError(update, ErrorKind::kValidationFailed, + "Cannot find partition field to remove"); +} + +TEST_P(UpdatePartitionSpecTest, TestRenameUnknownField) { + auto update = CreateUpdatePartitionSpec(true); + update->RenameField("shake", "seal"); + ExpectError(update, ErrorKind::kValidationFailed, + "Cannot find partition field to rename: shake"); +} + +TEST_P(UpdatePartitionSpecTest, TestRenameAfterAdd) { + auto update = CreateUpdatePartitionSpec(true); + update->AddField(Expressions::Truncate("data", 4), "data_trunc"); + update->RenameField("data_trunc", "prefix"); + ExpectError(update, ErrorKind::kValidationFailed, + "Cannot rename newly added partition field: data_trunc"); +} + +TEST_P(UpdatePartitionSpecTest, TestRenameAndDelete) { + auto update = CreateUpdatePartitionSpec(true); + update->RenameField("shard", "id_bucket"); + update->RemoveField(Expressions::Bucket("id", 16)); + ExpectError(update, ErrorKind::kValidationFailed, + "Cannot rename and delete partition field: shard"); +} + +TEST_P(UpdatePartitionSpecTest, TestDeleteAndRename) { + auto update = CreateUpdatePartitionSpec(true); + update->RemoveField(Expressions::Bucket("id", 16)); + update->RenameField("shard", "id_bucket"); + ExpectError(update, ErrorKind::kValidationFailed, + "Cannot delete and rename partition field: shard"); +} + +TEST_P(UpdatePartitionSpecTest, TestRemoveAndAddMultiTimes) { + // Add first time + auto update1 = CreateUpdatePartitionSpec(false); + update1->AddField(Expressions::Day("ts"), "ts_date"); + auto add_first_time_spec = ApplyUpdateAndGetSpec(update1); + + // Remove first time + auto table1 = CreateTableWithSpec(add_first_time_spec, "test_table"); + auto update2 = CreateUpdateFromTable(table1); + update2->RemoveField(Expressions::Day("ts")); + auto remove_first_time_spec = ApplyUpdateAndGetSpec(update2); + + // Add second time + auto table2 = CreateTableWithSpec(remove_first_time_spec, "test_table2"); + auto update3 = CreateUpdateFromTable(table2); + update3->AddField(Expressions::Day("ts"), "ts_date"); + auto add_second_time_spec = ApplyUpdateAndGetSpec(update3); + + // Remove second time + auto table3 = CreateTableWithSpec(add_second_time_spec, "test_table3"); + auto update4 = CreateUpdateFromTable(table3); + update4->RemoveField(Expressions::Day("ts")); + auto remove_second_time_spec = ApplyUpdateAndGetSpec(update4); + + // Add third time with month + auto table4 = CreateTableWithSpec(remove_second_time_spec, "test_table4"); + auto update5 = CreateUpdateFromTable(table4); + update5->AddField(Expressions::Month("ts")); + auto add_third_time_spec = ApplyUpdateAndGetSpec(update5); + + // Rename ts_month to ts_date + auto table5 = CreateTableWithSpec(add_third_time_spec, "test_table5"); + auto update6 = CreateUpdateFromTable(table5); + update6->RenameField("ts_month", "ts_date"); + auto updated_spec = ApplyUpdateAndGetSpec(update6); + + if (format_version_ == 1) { + ASSERT_EQ(updated_spec->fields().size(), 3); + // In V1, we expect void transforms for deleted fields + EXPECT_TRUE(updated_spec->fields()[0].name().find("ts_date") == 0); + EXPECT_TRUE(updated_spec->fields()[1].name().find("ts_date") == 0); + EXPECT_EQ(updated_spec->fields()[2].name(), "ts_date"); + EXPECT_EQ(*updated_spec->fields()[0].transform(), *Transform::Void()); + EXPECT_EQ(*updated_spec->fields()[1].transform(), *Transform::Void()); + EXPECT_EQ(*updated_spec->fields()[2].transform(), *Transform::Month()); + } else { + ICEBERG_UNWRAP_OR_FAIL( + auto expected_spec, + PartitionSpec::Make(PartitionSpec::kInitialSpecId, + std::vector{ + PartitionField(2, 1000, "ts_date", Transform::Month())}, + 1000)); + auto expected = std::shared_ptr(expected_spec.release()); + AssertPartitionSpecEquals(*expected, *updated_spec); + } +} + +TEST_P(UpdatePartitionSpecTest, TestRemoveAndUpdateWithDifferentTransformation) { + auto initial_spec = MakeExpectedSpec( + {PartitionField(2, 1000, "ts_transformed", Transform::Month())}, 1000); + auto table = CreateTableWithSpec(initial_spec, "test_table"); + auto update = CreateUpdateFromTable(table); + update->RemoveField("ts_transformed"); + update->AddField(Expressions::Day("ts"), "ts_transformed"); + auto updated_spec = ApplyUpdateAndGetSpec(update); + + if (format_version_ == 1) { + ASSERT_EQ(updated_spec->fields().size(), 2); + EXPECT_TRUE(updated_spec->fields()[0].name().find("ts_transformed") == 0); + EXPECT_EQ(updated_spec->fields()[1].name(), "ts_transformed"); + EXPECT_EQ(*updated_spec->fields()[0].transform(), *Transform::Void()); + EXPECT_EQ(*updated_spec->fields()[1].transform(), *Transform::Day()); + } else { + ASSERT_EQ(updated_spec->fields().size(), 1); + EXPECT_EQ(updated_spec->fields()[0].name(), "ts_transformed"); + EXPECT_EQ(*updated_spec->fields()[0].transform(), *Transform::Day()); + } +} + +} // namespace iceberg diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index ca39ec043..77161628a 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -25,6 +25,7 @@ #include "iceberg/table_requirement.h" #include "iceberg/table_requirements.h" #include "iceberg/table_update.h" +#include "iceberg/update/update_partition_spec.h" #include "iceberg/update/update_properties.h" #include "iceberg/util/macros.h" @@ -108,4 +109,11 @@ Result> Transaction::NewUpdateProperties() { return update_properties; } +Result> Transaction::NewUpdateSpec() { + ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr update_spec, + UpdatePartitionSpec::Make(shared_from_this())); + ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_spec)); + return update_spec; +} + } // namespace iceberg diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index 36328026b..456a07ca0 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -60,6 +60,10 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this> NewUpdateProperties(); + /// \brief Create a new UpdatePartitionSpec to update the partition spec of this table + /// and commit the changes. + Result> NewUpdateSpec(); + private: Transaction(std::shared_ptr
table, Kind kind, bool auto_commit); diff --git a/src/iceberg/transform.cc b/src/iceberg/transform.cc index 614489710..2573ce411 100644 --- a/src/iceberg/transform.cc +++ b/src/iceberg/transform.cc @@ -388,6 +388,28 @@ std::string Transform::ToString() const { std::unreachable(); } +std::string Transform::GeneratePartitionName(std::string_view source_name) const { + switch (transform_type_) { + case TransformType::kIdentity: + return std::string(source_name); + case TransformType::kBucket: + // Format: sourceName_bucket_N (matching Java: sourceName + "_bucket_" + numBuckets) + return std::format("{}_bucket_{}", source_name, std::get(param_)); + case TransformType::kTruncate: + // Format: sourceName_trunc_N (matching Java: sourceName + "_trunc_" + width) + return std::format("{}_trunc_{}", source_name, std::get(param_)); + case TransformType::kYear: + case TransformType::kMonth: + case TransformType::kDay: + case TransformType::kHour: + case TransformType::kUnknown: + return std::format("{}_{}", source_name, TransformTypeToString(transform_type_)); + case TransformType::kVoid: + return std::format("{}_null", source_name); + } + std::unreachable(); +} + TransformFunction::TransformFunction(TransformType transform_type, std::shared_ptr source_type) : transform_type_(transform_type), source_type_(std::move(source_type)) {} diff --git a/src/iceberg/transform.h b/src/iceberg/transform.h index 53993b4e3..5f2a12a7c 100644 --- a/src/iceberg/transform.h +++ b/src/iceberg/transform.h @@ -197,6 +197,11 @@ class ICEBERG_EXPORT Transform : public util::Formattable { /// \brief Returns a string representation of this transform (e.g., "bucket[16]"). std::string ToString() const override; + /// \brief Generates a partition name for the transform. + /// \param source_name The name of the source column. + /// \return A string representation of the partition name. + std::string GeneratePartitionName(std::string_view source_name) const; + /// \brief Equality comparison. friend bool operator==(const Transform& lhs, const Transform& rhs) { return lhs.Equals(rhs); diff --git a/src/iceberg/transform_function.h b/src/iceberg/transform_function.h index b3cfa5a22..c8670824c 100644 --- a/src/iceberg/transform_function.h +++ b/src/iceberg/transform_function.h @@ -59,6 +59,9 @@ class ICEBERG_EXPORT BucketTransform : public TransformFunction { /// \brief Returns INT32 as the output type. std::shared_ptr ResultType() const override; + /// \brief Returns the number of buckets. + int32_t num_buckets() const { return num_buckets_; } + /// \brief Create a BucketTransform. /// \param source_type Type of the input data. /// \param num_buckets Number of buckets to hash into. diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 133a7043c..797fb9ccb 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -126,6 +126,12 @@ class BoundPredicate; class Expression; class Literal; class UnboundPredicate; +class BoundReference; +class BoundTransform; +template +class UnboundTerm; +class NamedReference; +class UnboundTransform; /// \brief Scan. class DataTableScan; @@ -178,6 +184,7 @@ class Transaction; /// \brief Update family. class PendingUpdate; class UpdateProperties; +class UpdatePartitionSpec; /// ---------------------------------------------------------------------------- /// TODO: Forward declarations below are not added yet. diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build index 38502b14e..0ee66fa48 100644 --- a/src/iceberg/update/meson.build +++ b/src/iceberg/update/meson.build @@ -16,6 +16,6 @@ # under the License. install_headers( - ['pending_update.h', 'update_properties.h'], + ['pending_update.h', 'update_partition_spec.h', 'update_properties.h'], subdir: 'iceberg/update', ) diff --git a/src/iceberg/update/pending_update.h b/src/iceberg/update/pending_update.h index c4618400d..c24bccd43 100644 --- a/src/iceberg/update/pending_update.h +++ b/src/iceberg/update/pending_update.h @@ -43,6 +43,7 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector { public: enum class Kind : uint8_t { kUpdateProperties, + kUpdatePartitionSpec, }; /// \brief Return the kind of this pending update. diff --git a/src/iceberg/update/update_partition_spec.cc b/src/iceberg/update/update_partition_spec.cc new file mode 100644 index 000000000..8f95fef75 --- /dev/null +++ b/src/iceberg/update/update_partition_spec.cc @@ -0,0 +1,488 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/update_partition_spec.h" + +#include + +#include "iceberg/expression/term.h" +#include "iceberg/partition_field.h" +#include "iceberg/partition_spec.h" +#include "iceberg/result.h" +#include "iceberg/schema.h" +#include "iceberg/table_metadata.h" +#include "iceberg/table_update.h" +#include "iceberg/transaction.h" +#include "iceberg/transform.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +Result> UpdatePartitionSpec::Make( + std::shared_ptr transaction) { + if (!transaction) [[unlikely]] { + return InvalidArgument("Cannot create UpdatePartitionSpec without a transaction"); + } + return std::shared_ptr( + new UpdatePartitionSpec(std::move(transaction))); +} + +UpdatePartitionSpec::UpdatePartitionSpec(std::shared_ptr transaction) + : PendingUpdate(std::move(transaction)) { + const TableMetadata* base_metadata = transaction_->base(); + if (base_metadata == nullptr) [[unlikely]] { + AddError(ErrorKind::kInvalidArgument, + "Base table metadata is required to construct UpdatePartitionSpec"); + return; + } + format_version_ = base_metadata->format_version; + + // Get the current/default partition spec + auto spec_result = base_metadata->PartitionSpec(); + if (!spec_result.has_value()) { + AddError(spec_result.error()); + return; + } + spec_ = std::move(spec_result.value()); + + // Get the current schema + auto schema_result = base_metadata->Schema(); + if (!schema_result.has_value()) { + AddError(schema_result.error()); + return; + } + schema_ = std::move(schema_result.value()); + + last_assigned_partition_id_ = spec_->last_assigned_field_id(); + name_to_field_ = IndexSpecByName(*spec_); + transform_to_field_ = IndexSpecByTransform(*spec_); + + // Check for unknown transforms + for (const auto& field : spec_->fields()) { + if (field.transform()->transform_type() == TransformType::kUnknown) { + AddError(ErrorKind::kInvalidArgument, + "Cannot update partition spec with unknown transform: {}", + field.ToString()); + return; + } + } + + // Build index of historical partition fields for efficient recycling (V2+) + if (format_version_ >= 2) { + BuildHistoricalFieldsIndex(); + } +} + +UpdatePartitionSpec::~UpdatePartitionSpec() = default; + +UpdatePartitionSpec& UpdatePartitionSpec::CaseSensitive(bool is_case_sensitive) { + case_sensitive_ = is_case_sensitive; + return *this; +} + +UpdatePartitionSpec& UpdatePartitionSpec::AddNonDefaultSpec() { + set_as_default_ = false; + return *this; +} + +UpdatePartitionSpec& UpdatePartitionSpec::AddField(const std::string& source_name) { + // Find the source field in the schema + ICEBERG_BUILDER_ASSIGN_OR_RETURN( + auto field_opt, schema_->FindFieldByName(source_name, case_sensitive_)); + + ICEBERG_BUILDER_CHECK(field_opt.has_value(), "Cannot find source field: {}", + source_name); + int32_t source_id = field_opt->get().field_id(); + return AddFieldInternal(nullptr, source_id, Transform::Identity()); +} + +UpdatePartitionSpec& UpdatePartitionSpec::AddField(std::shared_ptr term, + std::string part_name) { + // Bind the term to get the source field + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto bound_ref, term->Bind(*schema_, case_sensitive_)); + int32_t source_id = bound_ref->field().field_id(); + + // Reference terms use identity transform + return AddFieldInternal(part_name.empty() ? nullptr : &part_name, source_id, + Transform::Identity()); +} + +UpdatePartitionSpec& UpdatePartitionSpec::AddField(std::shared_ptr term, + std::string part_name) { + // Bind the term to get the source field and transform + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto bound_transform, + term->Bind(*schema_, case_sensitive_)); + int32_t source_id = bound_transform->reference()->field().field_id(); + return AddFieldInternal(part_name.empty() ? nullptr : &part_name, source_id, + bound_transform->transform()); +} + +UpdatePartitionSpec& UpdatePartitionSpec::AddFieldInternal( + const std::string* name, int32_t source_id, std::shared_ptr transform) { + // Cache transform string to avoid repeated ToString() calls + const std::string transform_str = transform->ToString(); + TransformKey validation_key{source_id, transform_str}; + + // Check for duplicate name in added fields + if (name != nullptr) { + ICEBERG_BUILDER_CHECK(!added_field_names_.contains(*name), + "Cannot add duplicate partition field: {}", *name); + } + + // Check if this field already exists in the current spec + auto existing_it = transform_to_field_.find(validation_key); + if (existing_it != transform_to_field_.end()) { + const auto& existing = existing_it->second; + const bool is_deleted = deletes_.contains(existing->field_id()); + if (is_deleted && *existing->transform() == *transform) { + // If the field was deleted and we're re-adding the same one, just undo the delete + return RewriteDeleteAndAddField(*existing, name); + } + + ICEBERG_BUILDER_CHECK( + is_deleted, + "Cannot add duplicate partition field {} for source {} with transform {}, " + "conflicts with {}", + name ? *name : "unknown", source_id, transform_str, existing->ToString()); + } + + // Check if already being added + auto added_it = transform_to_added_field_.find(validation_key); + ICEBERG_BUILDER_CHECK( + added_it == transform_to_added_field_.end(), + "Cannot add duplicate partition field {} for source {} with transform {}, " + "already added: {}", + name ? *name : "unknown", source_id, transform_str, added_it->second); + + // Create or recycle the partition field + PartitionField new_field = RecycleOrCreatePartitionField(source_id, transform, name); + + // Generate name if not provided + std::string field_name; + if (name != nullptr) { + field_name = *name; + } else { + field_name = GeneratePartitionName(source_id, transform); + } + + // Create the final field with the name + new_field = PartitionField(new_field.source_id(), new_field.field_id(), field_name, + new_field.transform()); + + // Check for redundant time-based partitions + CheckForRedundantAddedPartitions(new_field); + transform_to_added_field_.emplace(validation_key, field_name); + + // Handle name conflicts with existing fields + auto existing_name_it = name_to_field_.find(field_name); + if (existing_name_it != name_to_field_.end()) { + const auto& existing_field = existing_name_it->second; + const bool existing_is_deleted = deletes_.contains(existing_field->field_id()); + if (!existing_is_deleted) { + if (IsVoidTransform(*existing_field)) { + // Rename the old deleted field + std::string renamed = + std::format("{}_{}", existing_field->name(), existing_field->field_id()); + RenameField(std::string(existing_field->name()), renamed); + } else { + ICEBERG_BUILDER_CHECK(false, "Cannot add duplicate partition field name: {}", + field_name); + } + } else { + // Field is being deleted, rename it to avoid conflict + std::string renamed = + std::format("{}_{}", existing_field->name(), existing_field->field_id()); + renames_[std::string(existing_field->name())] = renamed; + } + } + + adds_.push_back(std::move(new_field)); + added_field_names_.emplace(field_name); + + return *this; +} + +UpdatePartitionSpec& UpdatePartitionSpec::RewriteDeleteAndAddField( + const PartitionField& existing, const std::string* name) { + deletes_.erase(existing.field_id()); + if (name == nullptr || std::string(existing.name()) == *name) { + return *this; + } + return RenameField(std::string(existing.name()), *name); +} + +UpdatePartitionSpec& UpdatePartitionSpec::RemoveField(const std::string& name) { + // Cannot delete newly added fields + ICEBERG_BUILDER_CHECK(!added_field_names_.contains(name), + "Cannot delete newly added field: {}", name); + + // Cannot rename and delete + ICEBERG_BUILDER_CHECK(!renames_.contains(name), + "Cannot rename and delete partition field: {}", name); + + auto field_it = name_to_field_.find(name); + ICEBERG_BUILDER_CHECK(field_it != name_to_field_.end(), + "Cannot find partition field to remove: {}", name); + + deletes_.insert(field_it->second->field_id()); + return *this; +} + +UpdatePartitionSpec& UpdatePartitionSpec::RemoveField( + std::shared_ptr term) { + // Bind the term to get the source field + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto bound_ref, term->Bind(*schema_, case_sensitive_)); + int32_t source_id = bound_ref->field().field_id(); + + // Reference terms use identity transform + TransformKey key{source_id, Transform::Identity()->ToString()}; + return RemoveFieldByTransform(key, term->ToString()); +} + +UpdatePartitionSpec& UpdatePartitionSpec::RemoveField( + std::shared_ptr term) { + // Bind the term to get the source field and transform + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto bound_transform, + term->Bind(*schema_, case_sensitive_)); + int32_t source_id = bound_transform->reference()->field().field_id(); + auto transform = bound_transform->transform(); + + TransformKey key{source_id, transform->ToString()}; + return RemoveFieldByTransform(key, term->ToString()); +} + +UpdatePartitionSpec& UpdatePartitionSpec::RemoveFieldByTransform( + const TransformKey& key, const std::string& term_str) { + // Cannot delete newly added fields + ICEBERG_BUILDER_CHECK(!transform_to_added_field_.contains(key), + "Cannot delete newly added field: {}", term_str); + + auto field_it = transform_to_field_.find(key); + ICEBERG_BUILDER_CHECK(field_it != transform_to_field_.end(), + "Cannot find partition field to remove: {}", term_str); + + const auto& field = field_it->second; + // Cannot rename and delete + ICEBERG_BUILDER_CHECK(!renames_.contains(std::string(field->name())), + "Cannot rename and delete partition field: {}", field->name()); + + deletes_.insert(field->field_id()); + return *this; +} + +UpdatePartitionSpec& UpdatePartitionSpec::RenameField(const std::string& name, + const std::string& new_name) { + // Handle existing void field with the new name + auto existing_it = name_to_field_.find(new_name); + if (existing_it != name_to_field_.end() && IsVoidTransform(*existing_it->second)) { + std::string renamed = std::format("{}_{}", existing_it->second->name(), + existing_it->second->field_id()); + RenameField(std::string(existing_it->second->name()), renamed); + } + + // Cannot rename newly added fields + ICEBERG_BUILDER_CHECK(!added_field_names_.contains(name), + "Cannot rename newly added partition field: {}", name); + + auto field_it = name_to_field_.find(name); + ICEBERG_BUILDER_CHECK(field_it != name_to_field_.end(), + "Cannot find partition field to rename: {}", name); + + // Cannot delete and rename + ICEBERG_BUILDER_CHECK(!deletes_.contains(field_it->second->field_id()), + "Cannot delete and rename partition field: {}", name); + + renames_[name] = new_name; + return *this; +} + +Result UpdatePartitionSpec::Apply() { + ICEBERG_RETURN_UNEXPECTED(CheckErrors()); + + // Reserve capacity to avoid reallocations + const size_t existing_fields_count = spec_->fields().size(); + const size_t adds_count = adds_.size(); + std::vector new_fields; + new_fields.reserve(existing_fields_count + adds_count); + + // Process existing fields + for (const auto& field : spec_->fields()) { + if (!deletes_.contains(field.field_id())) { + // Field is kept, check for rename + auto rename_it = renames_.find(std::string(field.name())); + if (rename_it != renames_.end()) { + new_fields.emplace_back(field.source_id(), field.field_id(), rename_it->second, + field.transform()); + } else { + new_fields.push_back(field); + } + } else if (format_version_ < 2) { + // In V1, deleted fields are replaced with void transform + auto rename_it = renames_.find(std::string(field.name())); + std::string field_name = + rename_it != renames_.end() ? rename_it->second : std::string(field.name()); + new_fields.emplace_back(field.source_id(), field.field_id(), field_name, + Transform::Void()); + } + // In V2, deleted fields are simply removed + } + + // Add new fields + new_fields.insert(new_fields.end(), adds_.begin(), adds_.end()); + + // Determine the new spec ID + int32_t new_spec_id = spec_ ? spec_->spec_id() + 1 : PartitionSpec::kInitialSpecId; + + // In V2, if all fields are removed, reset last_assigned_partition_id to allow + // field IDs to restart from 1000 when fields are added again + int32_t last_assigned_id = last_assigned_partition_id_; + if (format_version_ >= 2 && new_fields.empty()) { + last_assigned_id = PartitionSpec::kLegacyPartitionDataIdStart - 1; + } + + ICEBERG_ASSIGN_OR_RAISE( + auto new_spec, + PartitionSpec::Make(*schema_, new_spec_id, std::move(new_fields), + /*allow_missing_fields=*/false, last_assigned_id)); + + ApplyResult result; + auto spec_id = new_spec->spec_id(); + + // Add the new partition spec + result.updates.emplace_back( + std::make_unique(std::move(new_spec))); + + // If set_as_default_ is true, set this spec as the default + if (set_as_default_) { + result.updates.emplace_back( + std::make_unique(spec_id)); + } + + return result; +} + +int32_t UpdatePartitionSpec::AssignFieldId() { return ++last_assigned_partition_id_; } + +PartitionField UpdatePartitionSpec::RecycleOrCreatePartitionField( + int32_t source_id, std::shared_ptr transform, const std::string* name) { + // In V2+, use pre-built index for O(1) lookup instead of O(n*m) iteration + if (format_version_ >= 2 && !historical_fields_.empty()) { + const std::string transform_str = transform->ToString(); + TransformKey key{source_id, transform_str}; + auto it = historical_fields_.find(key); + if (it != historical_fields_.end()) { + const auto& field = it->second; + // If target name is specified then consider it too, otherwise not + if (name == nullptr || std::string(field.name()) == *name) { + return field; + } + } + } + // No matching field found, create a new one + std::string field_name = name ? *name : ""; + return {source_id, AssignFieldId(), field_name, transform}; +} + +std::string UpdatePartitionSpec::GeneratePartitionName( + int32_t source_id, const std::shared_ptr& transform) const { + // Find the source field name + auto field_result = schema_->FindFieldById(source_id); + std::string_view source_name = "unknown"; + if (field_result.has_value() && field_result.value().has_value()) { + source_name = field_result.value().value().get().name(); + } + + return transform->GeneratePartitionName(std::string(source_name)); +} + +bool UpdatePartitionSpec::IsTimeTransform(const std::shared_ptr& transform) { + switch (transform->transform_type()) { + case TransformType::kYear: + case TransformType::kMonth: + case TransformType::kDay: + case TransformType::kHour: + return true; + default: + return false; + } +} + +bool UpdatePartitionSpec::IsVoidTransform(const PartitionField& field) { + return field.transform()->transform_type() == TransformType::kVoid; +} + +void UpdatePartitionSpec::CheckForRedundantAddedPartitions(const PartitionField& field) { + if (IsTimeTransform(field.transform())) { + if (added_time_fields_.contains(field.source_id())) { + AddError(ErrorKind::kInvalidArgument, + "Cannot add redundant partition field: {} conflicts with {}", + field.ToString(), added_time_fields_.at(field.source_id())); + return; + } + added_time_fields_.emplace(field.source_id(), field.ToString()); + } +} + +std::unordered_map +UpdatePartitionSpec::IndexSpecByName(const PartitionSpec& spec) { + std::unordered_map index; + for (const auto& field : spec.fields()) { + index.emplace(std::string(field.name()), &field); + } + return index; +} + +std::unordered_map +UpdatePartitionSpec::IndexSpecByTransform(const PartitionSpec& spec) { + std::unordered_map index; + index.reserve(spec.fields().size()); + for (const auto& field : spec.fields()) { + TransformKey key{field.source_id(), field.transform()->ToString()}; + index.emplace(key, &field); + } + return index; +} + +void UpdatePartitionSpec::BuildHistoricalFieldsIndex() { + const TableMetadata* base_metadata = transaction_->base(); + if (base_metadata == nullptr) { + return; + } + + // Count total fields across all specs to reserve capacity + size_t total_fields = 0; + for (const auto& partition_spec : base_metadata->partition_specs) { + total_fields += partition_spec->fields().size(); + } + historical_fields_.reserve(total_fields); + + // Index all fields from all historical partition specs + // Later specs override earlier ones for the same (source_id, transform) key + for (const auto& partition_spec : base_metadata->partition_specs) { + for (const auto& field : partition_spec->fields()) { + TransformKey key{field.source_id(), field.transform()->ToString()}; + // Use emplace to only insert if key doesn't exist, preserving first occurrence + // This ensures we get the earliest field ID for recycling + historical_fields_.emplace(key, field); + } + } +} + +} // namespace iceberg diff --git a/src/iceberg/update/update_partition_spec.h b/src/iceberg/update/update_partition_spec.h new file mode 100644 index 000000000..d34f5024e --- /dev/null +++ b/src/iceberg/update/update_partition_spec.h @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/update/update_partition_spec.h +/// API for partition spec evolution. + +#include +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/table_identifier.h" +#include "iceberg/type_fwd.h" +#include "iceberg/update/pending_update.h" + +namespace iceberg { + +/// \brief API for partition spec evolution. +/// +/// When committing, these changes will be applied to the current table metadata. +/// Commit conflicts will not be resolved and will result in a CommitFailed error. +class ICEBERG_EXPORT UpdatePartitionSpec : public PendingUpdate { + public: + static Result> Make( + std::shared_ptr transaction); + + ~UpdatePartitionSpec() override; + + /// \brief Set whether column resolution in the source schema should be case sensitive. + UpdatePartitionSpec& CaseSensitive(bool is_case_sensitive); + + /// \brief Add a new partition field from a source column. + /// + /// The partition field will be created as an identity partition field for the given + /// source column, with the same name as the source column. + /// + /// \param source_name Source column name in the table schema. + /// \return Reference to this for method chaining. + UpdatePartitionSpec& AddField(const std::string& source_name); + + /// \brief Add a new partition field with a custom name. + /// + /// \param term The named reference representing the source column. + /// \param part_name Name for the partition field. + /// \return Reference to this for method chaining. + UpdatePartitionSpec& AddField(std::shared_ptr term, + std::string part_name = ""); + + /// \brief Add a new partition field with a custom name from an unbound transform. + /// + /// \param term The unbound transform representing the partition transform. + /// \param part_name Name for the partition field. + /// \return Reference to this for method chaining. + UpdatePartitionSpec& AddField(std::shared_ptr term, + std::string part_name = ""); + + /// \brief Remove a partition field by name. + /// + /// \param name Name of the partition field to remove. + /// \return Reference to this for method chaining. + UpdatePartitionSpec& RemoveField(const std::string& name); + + /// \brief Remove a partition field by its transform term. + /// + /// The partition field with the same transform and source reference will be removed. + /// If the term is a reference and does not have a transform, the identity transform + /// is used. + /// + /// \param term The named reference representing the partition transform to remove. + /// \return Reference to this for method chaining. + UpdatePartitionSpec& RemoveField(std::shared_ptr term); + + /// \brief Remove a partition field by its transform term. + /// + /// The partition field with the same transform and source reference will be removed. + /// + /// \param term The unbound transform representing the partition transform to remove. + /// \return Reference to this for method chaining. + UpdatePartitionSpec& RemoveField(std::shared_ptr term); + + /// \brief Rename a field in the partition spec. + /// + /// \param name Name of the partition field to rename. + /// \param new_name Replacement name for the partition field. + /// \return Reference to this for method chaining. + UpdatePartitionSpec& RenameField(const std::string& name, const std::string& new_name); + + /// \brief Sets that the new partition spec will NOT be set as the default. + /// + /// The default behavior is to set the new spec as the default partition spec. + /// + /// \return Reference to this for method chaining. + UpdatePartitionSpec& AddNonDefaultSpec(); + + Kind kind() const final { return Kind::kUpdatePartitionSpec; } + + Result Apply() final; + + private: + explicit UpdatePartitionSpec(std::shared_ptr transaction); + + /// \brief Pair of source ID and transform string for indexing. + using TransformKey = std::pair; + + /// \brief Hash function for TransformKey. + struct TransformKeyHash { + size_t operator()(const TransformKey& key) const { + return 31 * std::hash{}(key.first) + std::hash{}(key.second); + } + }; + + /// \brief Assign a new partition field ID. + int32_t AssignFieldId(); + + /// \brief Recycle or create a partition field. + /// + /// In V2, searches for a similar partition field in historical specs. + /// If not found or in V1, creates a new PartitionField. + PartitionField RecycleOrCreatePartitionField(int32_t source_id, + std::shared_ptr transform, + const std::string* name); + + /// \brief Internal implementation of AddField with resolved source ID and transform. + UpdatePartitionSpec& AddFieldInternal(const std::string* name, int32_t source_id, + std::shared_ptr transform); + + /// \brief Generate a partition field name from the source and transform. + std::string GeneratePartitionName(int32_t source_id, + const std::shared_ptr& transform) const; + + /// \brief Check if a transform is a time-based transform. + static bool IsTimeTransform(const std::shared_ptr& transform); + + /// \brief Check if a partition field uses void transform. + static bool IsVoidTransform(const PartitionField& field); + + /// \brief Check for redundant time-based partition fields. + void CheckForRedundantAddedPartitions(const PartitionField& field); + + /// \brief Handle rewriting a delete-and-add operation for the same field. + UpdatePartitionSpec& RewriteDeleteAndAddField(const PartitionField& existing, + const std::string* name); + + /// \brief Internal helper to remove a field by transform key. + UpdatePartitionSpec& RemoveFieldByTransform(const TransformKey& key, + const std::string& term_str); + + /// \brief Index the spec fields by name. + static std::unordered_map IndexSpecByName( + const PartitionSpec& spec); + + /// \brief Index the spec fields by (source_id, transform) pair. + static std::unordered_map + IndexSpecByTransform(const PartitionSpec& spec); + + /// \brief Build index of historical partition fields for efficient recycling (V2+). + void BuildHistoricalFieldsIndex(); + + // Configuration + int32_t format_version_; + std::shared_ptr spec_; + std::shared_ptr schema_; + bool case_sensitive_{true}; + bool set_as_default_{true}; + int32_t last_assigned_partition_id_; + + // Indexes for existing fields + std::unordered_map name_to_field_; + std::unordered_map + transform_to_field_; + + // Index for historical partition fields (V2+ only) for efficient recycling + // Maps (source_id, transform_string) -> PartitionField from all historical specs + std::unordered_map historical_fields_; + + // Pending changes + std::vector adds_; + std::unordered_set added_field_names_; + std::unordered_map added_time_fields_; + std::unordered_map + transform_to_added_field_; + std::unordered_set deletes_; + std::unordered_map renames_; +}; + +} // namespace iceberg