From 7ed91d9dee113826dbcd85bc05a8b4ca5b30d1f9 Mon Sep 17 00:00:00 2001 From: "shuxu.li" Date: Sat, 15 Nov 2025 11:33:56 +0800 Subject: [PATCH 01/20] feat: add table-update api to transaction interface --- src/iceberg/transaction.h | 70 +++++++++++++++++++++++++++++++++++++++ src/iceberg/type_fwd.h | 19 +++++++++++ 2 files changed, 89 insertions(+) diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index 72ba5182c..e2bc490f2 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -38,6 +38,76 @@ class ICEBERG_EXPORT Transaction { /// \return this transaction's table virtual const std::shared_ptr& table() const = 0; + /// \brief Create a new schema addition operation + /// + /// \return a new AddSchema + virtual std::shared_ptr AddSchema() = 0; + + /// \brief Create a new set current schema operation + /// + /// \param schema_id the schema id to set as current + /// \return a new SetCurrentSchema + virtual std::shared_ptr SetCurrentSchema(int32_t schema_id) = 0; + + /// \brief Create a new remove schemas operation + /// + /// \param schema_ids the schema ids to remove + /// \return a new RemoveSchemas + virtual std::shared_ptr RemoveSchemas( + const std::vector& schema_ids) = 0; + + /// \brief Create a new partition spec addition operation + /// + /// \return a new AddPartitionSpec + virtual std::shared_ptr AddPartitionSpec() = 0; + + /// \brief Create a new set default partition spec operation + /// + /// \param spec_id the partition spec id to set as default + /// \return a new SetDefaultPartitionSpec + virtual std::shared_ptr SetDefaultPartitionSpec( + int32_t spec_id) = 0; + + /// \brief Create a new remove partition specs operation + /// + /// \param spec_ids the partition spec ids to remove + /// \return a new RemovePartitionSpecs + virtual std::shared_ptr RemovePartitionSpecs( + const std::vector& spec_ids) = 0; + + /// \brief Create a new sort order addition operation + /// + /// \return a new AddSortOrder + virtual std::shared_ptr AddSortOrder() = 0; + + /// \brief Create a new set default sort order operation + /// + /// \param order_id the sort order id to set as default + /// \return a new SetDefaultSortOrder + virtual std::shared_ptr SetDefaultSortOrder(int32_t order_id) = 0; + + /// \brief Create a new remove sort orders operation + /// + /// \param order_ids the sort order ids to remove + /// \return a new RemoveSortOrders + virtual std::shared_ptr RemoveSortOrders( + const std::vector& order_ids) = 0; + + /// \brief Create a new set properties operation + /// + /// \return a new SetProperties + virtual std::shared_ptr SetProperties() = 0; + + /// \brief Create a new remove properties operation + /// + /// \return a new RemoveProperties + virtual std::shared_ptr RemoveProperties() = 0; + + /// \brief Create a new set location operation + /// + /// \return a new SetLocation + virtual std::shared_ptr SetLocation() = 0; + /// \brief Create a new append API to add files to this table /// /// \return a new AppendFiles diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 0e1867f60..7a7fe0cfc 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -155,6 +155,25 @@ class StructLike; class StructLikeAccessor; class TableUpdate; +class AssignUUID; +class UpgradeFormatVersion; +class AddSchema; +class SetCurrentSchema; +class RemoveSchemas; +class AddPartitionSpec; +class SetDefaultPartitionSpec; +class RemovePartitionSpecs; +class AddSortOrder; +class SetDefaultSortOrder; +class RemoveSortOrders; +class AddSnapshot; +class RemoveSnapshots; +class RemoveSnapshotRef; +class SetSnapshotRef; +class SetProperties; +class RemoveProperties; +class SetLocation; + class TableRequirement; class TableMetadataBuilder; class TableUpdateContext; From 235b4bc0b538ce5a8f658a571117157035745dfd Mon Sep 17 00:00:00 2001 From: "shuxu.li" Date: Sat, 29 Nov 2025 21:49:12 +0800 Subject: [PATCH 02/20] feat: transactional UpdateProperties support --- src/iceberg/CMakeLists.txt | 2 + src/iceberg/base_transaction.cc | 83 ++++++++ src/iceberg/base_transaction.h | 56 ++++++ src/iceberg/pending_update.cc | 64 ++++++ src/iceberg/pending_update.h | 16 ++ src/iceberg/table.h | 3 + src/iceberg/table_metadata.cc | 13 ++ src/iceberg/table_metadata.h | 17 ++ src/iceberg/table_update.cc | 10 +- src/iceberg/test/CMakeLists.txt | 1 + src/iceberg/test/pending_update_test.cc | 103 ++++++++++ .../test/transaction_pending_update_test.cc | 183 ++++++++++++++++++ src/iceberg/transaction.h | 71 +------ src/iceberg/type_fwd.h | 28 +-- 14 files changed, 565 insertions(+), 85 deletions(-) create mode 100644 src/iceberg/base_transaction.cc create mode 100644 src/iceberg/base_transaction.h create mode 100644 src/iceberg/pending_update.cc create mode 100644 src/iceberg/test/pending_update_test.cc create mode 100644 src/iceberg/test/transaction_pending_update_test.cc diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 6e9eb0baf..30508ebde 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -19,6 +19,7 @@ set(ICEBERG_INCLUDES "$" "$") set(ICEBERG_SOURCES arrow_c_data_guard_internal.cc + base_transaction.cc catalog/memory/in_memory_catalog.cc expression/aggregate.cc expression/binder.cc @@ -53,6 +54,7 @@ set(ICEBERG_SOURCES partition_field.cc partition_spec.cc partition_summary.cc + pending_update.cc row/arrow_array_wrapper.cc row/manifest_wrapper.cc row/partition_values.cc diff --git a/src/iceberg/base_transaction.cc b/src/iceberg/base_transaction.cc new file mode 100644 index 000000000..994c146f5 --- /dev/null +++ b/src/iceberg/base_transaction.cc @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/base_transaction.h" + +#include + +#include "iceberg/catalog.h" +#include "iceberg/pending_update.h" +#include "iceberg/table.h" +#include "iceberg/table_metadata.h" +#include "iceberg/table_requirements.h" +#include "iceberg/table_update.h" + +namespace iceberg { + +BaseTransaction::BaseTransaction(std::shared_ptr
table, + std::shared_ptr catalog) + : table_(std::move(table)), catalog_(std::move(catalog)) { + ICEBERG_DCHECK(table_ != nullptr, "table must not be null"); + ICEBERG_DCHECK(catalog_ != nullptr, "catalog must not be null"); +} + +const std::shared_ptr
& BaseTransaction::table() const { return table_; } + +std::shared_ptr BaseTransaction::UpdateProperties() { + return RegisterUpdate(); +} + +std::shared_ptr BaseTransaction::NewAppend() { + throw NotImplemented("BaseTransaction::NewAppend not implemented"); +} + +Status BaseTransaction::CommitTransaction() { + const auto& metadata = table_->metadata(); + if (!metadata) { + return InvalidArgument("Table metadata is null"); + } + + auto builder = TableMetadataBuilder::BuildFrom(metadata.get()); + for (const auto& pending_update : pending_updates_) { + if (!pending_update) { + continue; + } + ICEBERG_RETURN_UNEXPECTED(pending_update->Apply(*builder)); + } + + auto table_updates = builder->GetChanges(); + TableUpdateContext context(metadata.get(), /*is_replace=*/false); + for (const auto& update : table_updates) { + ICEBERG_RETURN_UNEXPECTED(update->GenerateRequirements(context)); + } + ICEBERG_ASSIGN_OR_RAISE(auto table_requirements, context.Build()); + + ICEBERG_ASSIGN_OR_RAISE( + auto updated_table, + catalog_->UpdateTable(table_->name(), table_requirements, table_updates)); + + if (updated_table) { + table_ = std::shared_ptr
(std::move(updated_table)); + } + + pending_updates_.clear(); + return {}; +} + +} // namespace iceberg diff --git a/src/iceberg/base_transaction.h b/src/iceberg/base_transaction.h new file mode 100644 index 000000000..e40e51324 --- /dev/null +++ b/src/iceberg/base_transaction.h @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +#include "iceberg/transaction.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Base class for transaction implementations +class BaseTransaction : public Transaction { + public: + BaseTransaction(std::shared_ptr
table, std::shared_ptr catalog); + ~BaseTransaction() override = default; + + const std::shared_ptr
& table() const override; + + std::shared_ptr UpdateProperties() override; + + std::shared_ptr NewAppend() override; + + Status CommitTransaction() override; + + protected: + template + std::shared_ptr RegisterUpdate(Args&&... args) { + auto update = std::make_shared(std::forward(args)...); + pending_updates_.push_back(update); + return update; + } + + std::shared_ptr
table_; + std::shared_ptr catalog_; + std::vector> pending_updates_; +}; + +} // namespace iceberg diff --git a/src/iceberg/pending_update.cc b/src/iceberg/pending_update.cc new file mode 100644 index 000000000..d367466b1 --- /dev/null +++ b/src/iceberg/pending_update.cc @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/pending_update.h" + +#include "iceberg/catalog.h" +#include "iceberg/table.h" +#include "iceberg/table_metadata.h" +#include "iceberg/table_update.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +// ============================================================================ +// UpdateProperties implementation +// ============================================================================ + +PropertiesUpdate& PropertiesUpdate::Set(std::string const& key, + std::string const& value) { + updates_[key] = value; + return *this; +} + +PropertiesUpdate& PropertiesUpdate::Remove(std::string const& key) { + removals_.push_back(key); + return *this; +} + +Result PropertiesUpdate::Apply() { + return PropertiesUpdateChanges{updates_, removals_}; +} + +Status PropertiesUpdate::ApplyResult(TableMetadataBuilder& builder, + PropertiesUpdateChanges result) { + if (!result.updates.empty()) { + builder.SetProperties(result.updates); + } + if (!result.removals.empty()) { + builder.RemoveProperties(result.removals); + } + return {}; +} + +Status PropertiesUpdate::Commit() { + return NotImplemented("UpdateProperties::Commit() not implemented"); +} + +} // namespace iceberg diff --git a/src/iceberg/pending_update.h b/src/iceberg/pending_update.h index 8370db141..f293fdbb6 100644 --- a/src/iceberg/pending_update.h +++ b/src/iceberg/pending_update.h @@ -22,10 +22,15 @@ /// \file iceberg/pending_update.h /// API for table changes using builder pattern +#include +#include +#include + #include "iceberg/iceberg_export.h" #include "iceberg/result.h" #include "iceberg/type_fwd.h" #include "iceberg/util/error_collector.h" +#include "iceberg/util/macros.h" namespace iceberg { @@ -67,6 +72,17 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector { protected: PendingUpdate() = default; + + /// \brief Apply the pending changes to a TableMetadataBuilder + /// + /// This method applies the changes by calling builder's specific methods. + /// The builder will automatically record corresponding TableUpdate objects. + /// + /// \param builder The TableMetadataBuilder to apply changes to + /// \return Status::OK if the changes were applied successfully, or an error + virtual Status Apply(TableMetadataBuilder& builder) = 0; + + friend class BaseTransaction; }; } // namespace iceberg diff --git a/src/iceberg/table.h b/src/iceberg/table.h index df3a0c32e..2c5c2a6f7 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -124,6 +124,9 @@ class ICEBERG_EXPORT Table { /// \brief Returns a FileIO to read and write table data and metadata files const std::shared_ptr& io() const; + /// \brief Return the underlying table metadata + const std::shared_ptr& metadata() const { return metadata_; } + private: const TableIdentifier identifier_; std::shared_ptr metadata_; diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index a90c5b0ca..acb5857ac 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -670,4 +670,17 @@ Result> TableMetadataBuilder::Build() { return result; } +std::vector> TableMetadataBuilder::GetChanges() { + return std::move(impl_->changes); +} + +std::vector TableMetadataBuilder::GetChangesView() const { + std::vector result; + result.reserve(impl_->changes.size()); + for (const auto& change : impl_->changes) { + result.push_back(change.get()); + } + return result; +} + } // namespace iceberg diff --git a/src/iceberg/table_metadata.h b/src/iceberg/table_metadata.h index c00bd5e84..8470e0327 100644 --- a/src/iceberg/table_metadata.h +++ b/src/iceberg/table_metadata.h @@ -421,6 +421,23 @@ class ICEBERG_EXPORT TableMetadataBuilder : public ErrorCollector { /// \brief Destructor ~TableMetadataBuilder() override; + /// \brief Get all recorded TableUpdate changes + /// + /// This method extracts all TableUpdate objects that were automatically + /// recorded when modification methods were called. The changes are moved + /// out of the builder, so this method should only be called once. + /// + /// \return A vector of TableUpdate objects representing all changes + std::vector> GetChanges(); + + /// \brief Get all recorded TableUpdate changes (const version for inspection) + /// + /// This method returns a view of all TableUpdate objects without moving them. + /// Useful for inspection before committing. + /// + /// \return A vector of const pointers to TableUpdate objects + std::vector GetChangesView() const; + // Delete copy operations (use BuildFrom to create a new builder) TableMetadataBuilder(const TableMetadataBuilder&) = delete; TableMetadataBuilder& operator=(const TableMetadataBuilder&) = delete; diff --git a/src/iceberg/table_update.cc b/src/iceberg/table_update.cc index 90f7de622..ee5f9d67e 100644 --- a/src/iceberg/table_update.cc +++ b/src/iceberg/table_update.cc @@ -180,8 +180,9 @@ void SetProperties::ApplyTo(TableMetadataBuilder& builder) const { builder.SetProperties(updated_); } -void SetProperties::GenerateRequirements(TableUpdateContext& context) const { - // SetProperties doesn't generate any requirements +Status SetProperties::GenerateRequirements(TableUpdateContext& context) const { + // No requirements + return {}; } // RemoveProperties @@ -190,8 +191,9 @@ void RemoveProperties::ApplyTo(TableMetadataBuilder& builder) const { builder.RemoveProperties(removed_); } -void RemoveProperties::GenerateRequirements(TableUpdateContext& context) const { - // RemoveProperties doesn't generate any requirements +Status RemoveProperties::GenerateRequirements(TableUpdateContext& context) const { + // No requirements + return {}; } // SetLocation diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index a48567132..dba889bc1 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -77,6 +77,7 @@ add_iceberg_test(table_test table_requirement_test.cc table_requirements_test.cc table_update_test.cc + transaction_pending_update_test.cc update_properties_test.cc) add_iceberg_test(expression_test diff --git a/src/iceberg/test/pending_update_test.cc b/src/iceberg/test/pending_update_test.cc new file mode 100644 index 000000000..2def1f011 --- /dev/null +++ b/src/iceberg/test/pending_update_test.cc @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/pending_update.h" + +#include + +#include "iceberg/result.h" +#include "iceberg/test/matchers.h" + +namespace iceberg { + +// Mock implementation for testing the interface +class MockSnapshot {}; + +class MockPendingUpdate : public PendingUpdateTyped { + public: + MockPendingUpdate() = default; + + Result Apply() override { + if (should_fail_) { + return ValidationFailed("Mock validation failed"); + } + apply_called_ = true; + return MockSnapshot{}; + } + + Status ApplyResult(TableMetadataBuilder& builder, MockSnapshot result) override { + return {}; + } + + Status Commit() override { + if (should_fail_commit_) { + return CommitFailed("Mock commit failed"); + } + commit_called_ = true; + return {}; + } + + void SetShouldFail(bool fail) { should_fail_ = fail; } + void SetShouldFailCommit(bool fail) { should_fail_commit_ = fail; } + bool ApplyCalled() const { return apply_called_; } + bool CommitCalled() const { return commit_called_; } + + private: + bool should_fail_ = false; + bool should_fail_commit_ = false; + bool apply_called_ = false; + bool commit_called_ = false; +}; + +TEST(PendingUpdateTest, ApplySuccess) { + MockPendingUpdate update; + auto result = update.Apply(); + EXPECT_THAT(result, IsOk()); +} + +TEST(PendingUpdateTest, ApplyValidationFailed) { + MockPendingUpdate update; + update.SetShouldFail(true); + auto result = update.Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Mock validation failed")); +} + +TEST(PendingUpdateTest, CommitSuccess) { + MockPendingUpdate update; + auto status = update.Commit(); + EXPECT_THAT(status, IsOk()); + EXPECT_TRUE(update.CommitCalled()); +} + +TEST(PendingUpdateTest, CommitFailed) { + MockPendingUpdate update; + update.SetShouldFailCommit(true); + auto status = update.Commit(); + EXPECT_THAT(status, IsError(ErrorKind::kCommitFailed)); + EXPECT_THAT(status, HasErrorMessage("Mock commit failed")); +} + +TEST(PendingUpdateTest, BaseClassPolymorphism) { + std::unique_ptr base_ptr = std::make_unique(); + auto status = base_ptr->Commit(); + EXPECT_THAT(status, IsOk()); +} + +} // namespace iceberg diff --git a/src/iceberg/test/transaction_pending_update_test.cc b/src/iceberg/test/transaction_pending_update_test.cc new file mode 100644 index 000000000..77ec6f984 --- /dev/null +++ b/src/iceberg/test/transaction_pending_update_test.cc @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include +#include + +#include +#include + +#include "iceberg/base_transaction.h" +#include "iceberg/partition_spec.h" +#include "iceberg/pending_update.h" +#include "iceberg/snapshot.h" +#include "iceberg/sort_order.h" +#include "iceberg/table.h" +#include "iceberg/table_identifier.h" +#include "iceberg/table_metadata.h" +#include "iceberg/table_requirement.h" +#include "iceberg/table_requirements.h" +#include "iceberg/table_update.h" +#include "iceberg/test/matchers.h" +#include "iceberg/test/mock_catalog.h" +#include "iceberg/transaction.h" +#include "iceberg/util/macros.h" + +namespace iceberg { +namespace { + +using ::testing::ElementsAre; +using ::testing::NiceMock; + +TableIdentifier MakeIdentifier() { + return TableIdentifier{ + .ns = Namespace{.levels = {"test_ns"}}, + .name = "test_table", + }; +} + +std::shared_ptr CreateBaseMetadata() { + auto metadata = std::make_shared(); + metadata->format_version = TableMetadata::kDefaultTableFormatVersion; + metadata->table_uuid = "test-uuid"; + metadata->location = "s3://bucket/table"; + metadata->last_sequence_number = TableMetadata::kInitialSequenceNumber; + metadata->last_updated_ms = + TimePointMs{std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch())}; + metadata->last_column_id = 0; + metadata->default_spec_id = PartitionSpec::kInitialSpecId; + metadata->last_partition_id = 0; + metadata->current_snapshot_id = Snapshot::kInvalidSnapshotId; + metadata->default_sort_order_id = SortOrder::kInitialSortOrderId; + metadata->next_row_id = TableMetadata::kInitialRowId; + metadata->properties = {{"existing", "value"}}; + return metadata; +} + +std::shared_ptr
CreateTestTable(const TableIdentifier& identifier, + const std::shared_ptr& metadata, + const std::shared_ptr& catalog) { + return std::make_shared
(identifier, metadata, "s3://bucket/table/metadata.json", + nullptr, catalog); +} + +TEST(TransactionPendingUpdateTest, CommitSetPropertiesUsesCatalog) { + auto metadata = CreateBaseMetadata(); + const auto identifier = MakeIdentifier(); + auto catalog = std::make_shared>(); + auto table = + CreateTestTable(identifier, std::make_shared(*metadata), catalog); + BaseTransaction transaction(table, catalog); + + auto update_properties = transaction.UpdateProperties(); + update_properties->Set("write.metadata.delete-after-commit.enabled", "true"); + + EXPECT_CALL(*catalog, + UpdateTable(::testing::Eq(identifier), ::testing::_, ::testing::_)) + .WillOnce([](const TableIdentifier& id, + const std::vector>& /*requirements*/, + const std::vector>& updates) + -> Result> { + EXPECT_EQ("test_table", id.name); + EXPECT_EQ(1u, updates.size()); + const auto* set_update = + dynamic_cast(updates.front().get()); + EXPECT_NE(set_update, nullptr); + const auto& updated = set_update->updated(); + auto it = updated.find("write.metadata.delete-after-commit.enabled"); + EXPECT_NE(it, updated.end()); + EXPECT_EQ("true", it->second); + return Result>(std::unique_ptr
()); + }); + + EXPECT_THAT(transaction.CommitTransaction(), IsOk()); +} + +TEST(TransactionPendingUpdateTest, RemovePropertiesSkipsMissingKeys) { + auto metadata = CreateBaseMetadata(); + const auto identifier = MakeIdentifier(); + auto catalog = std::make_shared>(); + auto table = + CreateTestTable(identifier, std::make_shared(*metadata), catalog); + BaseTransaction transaction(table, catalog); + + auto update_properties = transaction.UpdateProperties(); + update_properties->Remove("missing").Remove("existing"); + + EXPECT_CALL(*catalog, + UpdateTable(::testing::Eq(identifier), ::testing::_, ::testing::_)) + .WillOnce([](const TableIdentifier&, + const std::vector>& /*requirements*/, + const std::vector>& updates) + -> Result> { + EXPECT_EQ(1u, updates.size()); + const auto* remove_update = + dynamic_cast(updates.front().get()); + EXPECT_NE(remove_update, nullptr); + EXPECT_THAT(remove_update->removed(), ElementsAre("existing")); + return Result>(std::unique_ptr
()); + }); + + EXPECT_THAT(transaction.CommitTransaction(), IsOk()); +} + +TEST(TransactionPendingUpdateTest, AggregatesMultiplePendingUpdates) { + auto metadata = CreateBaseMetadata(); + const auto identifier = MakeIdentifier(); + auto catalog = std::make_shared>(); + auto table = + CreateTestTable(identifier, std::make_shared(*metadata), catalog); + BaseTransaction transaction(table, catalog); + + auto update_properties = transaction.UpdateProperties(); + update_properties->Set("new-key", "new-value").Remove("existing"); + + EXPECT_CALL(*catalog, + UpdateTable(::testing::Eq(identifier), ::testing::_, ::testing::_)) + .WillOnce([](const TableIdentifier&, + const std::vector>& /*requirements*/, + const std::vector>& updates) + -> Result> { + EXPECT_EQ(2u, updates.size()); + + const auto* set_update = + dynamic_cast(updates[0].get()); + EXPECT_NE(set_update, nullptr); + const auto& updated = set_update->updated(); + auto it = updated.find("new-key"); + EXPECT_NE(it, updated.end()); + EXPECT_EQ("new-value", it->second); + + const auto* remove_update = + dynamic_cast(updates[1].get()); + EXPECT_NE(remove_update, nullptr); + EXPECT_THAT(remove_update->removed(), ElementsAre("existing")); + + return Result>(std::unique_ptr
()); + }); + + EXPECT_THAT(transaction.CommitTransaction(), IsOk()); +} + +} // namespace +} // namespace iceberg diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index e2bc490f2..24ed14a56 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -38,75 +38,10 @@ class ICEBERG_EXPORT Transaction { /// \return this transaction's table virtual const std::shared_ptr
& table() const = 0; - /// \brief Create a new schema addition operation + /// \brief Create a new update properties operation /// - /// \return a new AddSchema - virtual std::shared_ptr AddSchema() = 0; - - /// \brief Create a new set current schema operation - /// - /// \param schema_id the schema id to set as current - /// \return a new SetCurrentSchema - virtual std::shared_ptr SetCurrentSchema(int32_t schema_id) = 0; - - /// \brief Create a new remove schemas operation - /// - /// \param schema_ids the schema ids to remove - /// \return a new RemoveSchemas - virtual std::shared_ptr RemoveSchemas( - const std::vector& schema_ids) = 0; - - /// \brief Create a new partition spec addition operation - /// - /// \return a new AddPartitionSpec - virtual std::shared_ptr AddPartitionSpec() = 0; - - /// \brief Create a new set default partition spec operation - /// - /// \param spec_id the partition spec id to set as default - /// \return a new SetDefaultPartitionSpec - virtual std::shared_ptr SetDefaultPartitionSpec( - int32_t spec_id) = 0; - - /// \brief Create a new remove partition specs operation - /// - /// \param spec_ids the partition spec ids to remove - /// \return a new RemovePartitionSpecs - virtual std::shared_ptr RemovePartitionSpecs( - const std::vector& spec_ids) = 0; - - /// \brief Create a new sort order addition operation - /// - /// \return a new AddSortOrder - virtual std::shared_ptr AddSortOrder() = 0; - - /// \brief Create a new set default sort order operation - /// - /// \param order_id the sort order id to set as default - /// \return a new SetDefaultSortOrder - virtual std::shared_ptr SetDefaultSortOrder(int32_t order_id) = 0; - - /// \brief Create a new remove sort orders operation - /// - /// \param order_ids the sort order ids to remove - /// \return a new RemoveSortOrders - virtual std::shared_ptr RemoveSortOrders( - const std::vector& order_ids) = 0; - - /// \brief Create a new set properties operation - /// - /// \return a new SetProperties - virtual std::shared_ptr SetProperties() = 0; - - /// \brief Create a new remove properties operation - /// - /// \return a new RemoveProperties - virtual std::shared_ptr RemoveProperties() = 0; - - /// \brief Create a new set location operation - /// - /// \return a new SetLocation - virtual std::shared_ptr SetLocation() = 0; + /// \return a new UpdateProperties + virtual std::shared_ptr UpdateProperties() = 0; /// \brief Create a new append API to add files to this table /// diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 7a7fe0cfc..e36fdf298 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -155,24 +155,24 @@ class StructLike; class StructLikeAccessor; class TableUpdate; -class AssignUUID; -class UpgradeFormatVersion; -class AddSchema; -class SetCurrentSchema; -class RemoveSchemas; class AddPartitionSpec; -class SetDefaultPartitionSpec; -class RemovePartitionSpecs; -class AddSortOrder; -class SetDefaultSortOrder; -class RemoveSortOrders; +class AddSchema; class AddSnapshot; +class AddSortOrder; +class AssignUUID; +class RemovePartitionSpecs; +class RemoveProperties; +class RemoveSchemas; class RemoveSnapshots; class RemoveSnapshotRef; -class SetSnapshotRef; -class SetProperties; -class RemoveProperties; +class RemoveSortOrders; +class SetCurrentSchema; +class SetDefaultPartitionSpec; +class SetDefaultSortOrder; class SetLocation; +class SetProperties; +class SetSnapshotRef; +class UpgradeFormatVersion; class TableRequirement; class TableMetadataBuilder; @@ -183,6 +183,8 @@ template class PendingUpdateTyped; class UpdateProperties; +struct PropertiesUpdateChanges; + /// ---------------------------------------------------------------------------- /// TODO: Forward declarations below are not added yet. /// ---------------------------------------------------------------------------- From 2d73eb691ee471dfc7412b19d63842fbc7861901 Mon Sep 17 00:00:00 2001 From: "shuxu.li" Date: Sat, 29 Nov 2025 22:23:39 +0800 Subject: [PATCH 03/20] feat: transactional UpdateProperties method support --- src/iceberg/base_transaction.cc | 6 +-- src/iceberg/base_transaction.h | 6 +-- src/iceberg/pending_update.cc | 4 -- src/iceberg/table.cc | 5 ++- src/iceberg/table.h | 2 +- .../test/transaction_pending_update_test.cc | 41 ++++++------------- src/iceberg/transaction.h | 2 +- 7 files changed, 24 insertions(+), 42 deletions(-) diff --git a/src/iceberg/base_transaction.cc b/src/iceberg/base_transaction.cc index 994c146f5..f22e1f090 100644 --- a/src/iceberg/base_transaction.cc +++ b/src/iceberg/base_transaction.cc @@ -19,8 +19,6 @@ #include "iceberg/base_transaction.h" -#include - #include "iceberg/catalog.h" #include "iceberg/pending_update.h" #include "iceberg/table.h" @@ -30,14 +28,14 @@ namespace iceberg { -BaseTransaction::BaseTransaction(std::shared_ptr
table, +BaseTransaction::BaseTransaction(std::shared_ptr table, std::shared_ptr catalog) : table_(std::move(table)), catalog_(std::move(catalog)) { ICEBERG_DCHECK(table_ != nullptr, "table must not be null"); ICEBERG_DCHECK(catalog_ != nullptr, "catalog must not be null"); } -const std::shared_ptr
& BaseTransaction::table() const { return table_; } +const std::shared_ptr& BaseTransaction::table() const { return table_; } std::shared_ptr BaseTransaction::UpdateProperties() { return RegisterUpdate(); diff --git a/src/iceberg/base_transaction.h b/src/iceberg/base_transaction.h index e40e51324..cab21753d 100644 --- a/src/iceberg/base_transaction.h +++ b/src/iceberg/base_transaction.h @@ -29,10 +29,10 @@ namespace iceberg { /// \brief Base class for transaction implementations class BaseTransaction : public Transaction { public: - BaseTransaction(std::shared_ptr
table, std::shared_ptr catalog); + BaseTransaction(std::shared_ptr table, std::shared_ptr catalog); ~BaseTransaction() override = default; - const std::shared_ptr
& table() const override; + const std::shared_ptr& table() const override; std::shared_ptr UpdateProperties() override; @@ -48,7 +48,7 @@ class BaseTransaction : public Transaction { return update; } - std::shared_ptr
table_; + std::shared_ptr table_; std::shared_ptr catalog_; std::vector> pending_updates_; }; diff --git a/src/iceberg/pending_update.cc b/src/iceberg/pending_update.cc index d367466b1..6d7f9b224 100644 --- a/src/iceberg/pending_update.cc +++ b/src/iceberg/pending_update.cc @@ -27,10 +27,6 @@ namespace iceberg { -// ============================================================================ -// UpdateProperties implementation -// ============================================================================ - PropertiesUpdate& PropertiesUpdate::Set(std::string const& key, std::string const& value) { updates_[key] = value; diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index 458711255..ffc33062b 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -19,6 +19,9 @@ #include "iceberg/table.h" +#include + +#include "iceberg/base_transaction.h" #include "iceberg/catalog.h" #include "iceberg/partition_spec.h" #include "iceberg/schema.h" @@ -114,7 +117,7 @@ std::unique_ptr Table::UpdateProperties() const { } std::unique_ptr Table::NewTransaction() const { - throw NotImplemented("Table::NewTransaction is not implemented"); + return std::make_unique(shared_from_this(), catalog_); } const std::shared_ptr& Table::io() const { return io_; } diff --git a/src/iceberg/table.h b/src/iceberg/table.h index 2c5c2a6f7..cabd719d3 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -33,7 +33,7 @@ namespace iceberg { /// \brief Represents an Iceberg table -class ICEBERG_EXPORT Table { +class ICEBERG_EXPORT Table : public std::enable_shared_from_this
{ public: ~Table(); diff --git a/src/iceberg/test/transaction_pending_update_test.cc b/src/iceberg/test/transaction_pending_update_test.cc index 77ec6f984..cfb6fa775 100644 --- a/src/iceberg/test/transaction_pending_update_test.cc +++ b/src/iceberg/test/transaction_pending_update_test.cc @@ -17,34 +17,20 @@ * under the License. */ -#include -#include -#include -#include -#include - -#include #include #include "iceberg/base_transaction.h" #include "iceberg/partition_spec.h" #include "iceberg/pending_update.h" -#include "iceberg/snapshot.h" #include "iceberg/sort_order.h" #include "iceberg/table.h" -#include "iceberg/table_identifier.h" #include "iceberg/table_metadata.h" -#include "iceberg/table_requirement.h" -#include "iceberg/table_requirements.h" #include "iceberg/table_update.h" #include "iceberg/test/matchers.h" #include "iceberg/test/mock_catalog.h" -#include "iceberg/transaction.h" -#include "iceberg/util/macros.h" namespace iceberg { namespace { - using ::testing::ElementsAre; using ::testing::NiceMock; @@ -80,6 +66,7 @@ std::shared_ptr
CreateTestTable(const TableIdentifier& identifier, return std::make_shared
(identifier, metadata, "s3://bucket/table/metadata.json", nullptr, catalog); } +} // namespace TEST(TransactionPendingUpdateTest, CommitSetPropertiesUsesCatalog) { auto metadata = CreateBaseMetadata(); @@ -87,10 +74,9 @@ TEST(TransactionPendingUpdateTest, CommitSetPropertiesUsesCatalog) { auto catalog = std::make_shared>(); auto table = CreateTestTable(identifier, std::make_shared(*metadata), catalog); - BaseTransaction transaction(table, catalog); - - auto update_properties = transaction.UpdateProperties(); - update_properties->Set("write.metadata.delete-after-commit.enabled", "true"); + auto transaction = table->NewTransaction(); + auto update_properties = transaction->UpdateProperties(); + update_properties->Set("new-key", "new-value"); EXPECT_CALL(*catalog, UpdateTable(::testing::Eq(identifier), ::testing::_, ::testing::_)) @@ -104,13 +90,13 @@ TEST(TransactionPendingUpdateTest, CommitSetPropertiesUsesCatalog) { dynamic_cast(updates.front().get()); EXPECT_NE(set_update, nullptr); const auto& updated = set_update->updated(); - auto it = updated.find("write.metadata.delete-after-commit.enabled"); + auto it = updated.find("new-key"); EXPECT_NE(it, updated.end()); - EXPECT_EQ("true", it->second); + EXPECT_EQ("new-value", it->second); return Result>(std::unique_ptr
()); }); - EXPECT_THAT(transaction.CommitTransaction(), IsOk()); + EXPECT_THAT(transaction->CommitTransaction(), IsOk()); } TEST(TransactionPendingUpdateTest, RemovePropertiesSkipsMissingKeys) { @@ -119,9 +105,9 @@ TEST(TransactionPendingUpdateTest, RemovePropertiesSkipsMissingKeys) { auto catalog = std::make_shared>(); auto table = CreateTestTable(identifier, std::make_shared(*metadata), catalog); - BaseTransaction transaction(table, catalog); + auto transaction = table->NewTransaction(); - auto update_properties = transaction.UpdateProperties(); + auto update_properties = transaction->UpdateProperties(); update_properties->Remove("missing").Remove("existing"); EXPECT_CALL(*catalog, @@ -138,7 +124,7 @@ TEST(TransactionPendingUpdateTest, RemovePropertiesSkipsMissingKeys) { return Result>(std::unique_ptr
()); }); - EXPECT_THAT(transaction.CommitTransaction(), IsOk()); + EXPECT_THAT(transaction->CommitTransaction(), IsOk()); } TEST(TransactionPendingUpdateTest, AggregatesMultiplePendingUpdates) { @@ -147,9 +133,9 @@ TEST(TransactionPendingUpdateTest, AggregatesMultiplePendingUpdates) { auto catalog = std::make_shared>(); auto table = CreateTestTable(identifier, std::make_shared(*metadata), catalog); - BaseTransaction transaction(table, catalog); + auto transaction = table->NewTransaction(); - auto update_properties = transaction.UpdateProperties(); + auto update_properties = transaction->UpdateProperties(); update_properties->Set("new-key", "new-value").Remove("existing"); EXPECT_CALL(*catalog, @@ -176,8 +162,7 @@ TEST(TransactionPendingUpdateTest, AggregatesMultiplePendingUpdates) { return Result>(std::unique_ptr
()); }); - EXPECT_THAT(transaction.CommitTransaction(), IsOk()); + EXPECT_THAT(transaction->CommitTransaction(), IsOk()); } -} // namespace } // namespace iceberg diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index 24ed14a56..7b54b65ea 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -36,7 +36,7 @@ class ICEBERG_EXPORT Transaction { /// \brief Return the Table that this transaction will update /// /// \return this transaction's table - virtual const std::shared_ptr
& table() const = 0; + virtual const std::shared_ptr& table() const = 0; /// \brief Create a new update properties operation /// From a72ae7a1d76d1bb9c42095c6472094109b7d59e6 Mon Sep 17 00:00:00 2001 From: "shuxu.li" Date: Sat, 29 Nov 2025 22:36:56 +0800 Subject: [PATCH 04/20] feat: transactional UpdateProperties method support --- src/iceberg/pending_update.h | 1 - 1 file changed, 1 deletion(-) diff --git a/src/iceberg/pending_update.h b/src/iceberg/pending_update.h index f293fdbb6..5c5cf784a 100644 --- a/src/iceberg/pending_update.h +++ b/src/iceberg/pending_update.h @@ -22,7 +22,6 @@ /// \file iceberg/pending_update.h /// API for table changes using builder pattern -#include #include #include From 18fd9af95f5cafbbaa018a253b52c6c07c0ddf71 Mon Sep 17 00:00:00 2001 From: "shuxu.li" Date: Mon, 1 Dec 2025 09:47:06 +0800 Subject: [PATCH 05/20] feat: transactional UpdateProperties method support --- src/iceberg/meson.build | 2 ++ src/iceberg/pending_update.cc | 5 ++++- src/iceberg/test/CMakeLists.txt | 2 ++ ...ng_update_test.cc => base_transaction_test.cc} | 15 ++++++++------- 4 files changed, 16 insertions(+), 8 deletions(-) rename src/iceberg/test/{transaction_pending_update_test.cc => base_transaction_test.cc} (93%) diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index d473d72e1..995fdea68 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -41,6 +41,7 @@ configure_file( iceberg_include_dir = include_directories('..') iceberg_sources = files( 'arrow_c_data_guard_internal.cc', + 'base_transaction.cc', 'catalog/memory/in_memory_catalog.cc', 'expression/aggregate.cc', 'expression/binder.cc', @@ -75,6 +76,7 @@ iceberg_sources = files( 'partition_field.cc', 'partition_spec.cc', 'partition_summary.cc', + 'pending_update.cc', 'row/arrow_array_wrapper.cc', 'row/manifest_wrapper.cc', 'row/partition_values.cc', diff --git a/src/iceberg/pending_update.cc b/src/iceberg/pending_update.cc index 6d7f9b224..3e17deb20 100644 --- a/src/iceberg/pending_update.cc +++ b/src/iceberg/pending_update.cc @@ -39,7 +39,10 @@ PropertiesUpdate& PropertiesUpdate::Remove(std::string const& key) { } Result PropertiesUpdate::Apply() { - return PropertiesUpdateChanges{updates_, removals_}; + return PropertiesUpdateChanges{ + .updates = updates_, + .removals = removals_, + }; } Status PropertiesUpdate::ApplyResult(TableMetadataBuilder& builder, diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index dba889bc1..32e5fd059 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -71,6 +71,8 @@ add_iceberg_test(table_test SOURCES json_internal_test.cc metrics_config_test.cc + base_transaction_test.cc + pending_update_test.cc schema_json_test.cc table_test.cc table_metadata_builder_test.cc diff --git a/src/iceberg/test/transaction_pending_update_test.cc b/src/iceberg/test/base_transaction_test.cc similarity index 93% rename from src/iceberg/test/transaction_pending_update_test.cc rename to src/iceberg/test/base_transaction_test.cc index cfb6fa775..ff50730d9 100644 --- a/src/iceberg/test/transaction_pending_update_test.cc +++ b/src/iceberg/test/base_transaction_test.cc @@ -17,9 +17,10 @@ * under the License. */ +#include "iceberg/base_transaction.h" + #include -#include "iceberg/base_transaction.h" #include "iceberg/partition_spec.h" #include "iceberg/pending_update.h" #include "iceberg/sort_order.h" @@ -68,7 +69,7 @@ std::shared_ptr
CreateTestTable(const TableIdentifier& identifier, } } // namespace -TEST(TransactionPendingUpdateTest, CommitSetPropertiesUsesCatalog) { +TEST(BaseTransactionTest, CommitSetPropertiesUsesCatalog) { auto metadata = CreateBaseMetadata(); const auto identifier = MakeIdentifier(); auto catalog = std::make_shared>(); @@ -93,13 +94,13 @@ TEST(TransactionPendingUpdateTest, CommitSetPropertiesUsesCatalog) { auto it = updated.find("new-key"); EXPECT_NE(it, updated.end()); EXPECT_EQ("new-value", it->second); - return Result>(std::unique_ptr
()); + return {std::unique_ptr
()}; }); EXPECT_THAT(transaction->CommitTransaction(), IsOk()); } -TEST(TransactionPendingUpdateTest, RemovePropertiesSkipsMissingKeys) { +TEST(BaseTransactionTest, RemovePropertiesSkipsMissingKeys) { auto metadata = CreateBaseMetadata(); const auto identifier = MakeIdentifier(); auto catalog = std::make_shared>(); @@ -121,13 +122,13 @@ TEST(TransactionPendingUpdateTest, RemovePropertiesSkipsMissingKeys) { dynamic_cast(updates.front().get()); EXPECT_NE(remove_update, nullptr); EXPECT_THAT(remove_update->removed(), ElementsAre("existing")); - return Result>(std::unique_ptr
()); + return {std::unique_ptr
()}; }); EXPECT_THAT(transaction->CommitTransaction(), IsOk()); } -TEST(TransactionPendingUpdateTest, AggregatesMultiplePendingUpdates) { +TEST(BaseTransactionTest, AggregatesMultiplePendingUpdates) { auto metadata = CreateBaseMetadata(); const auto identifier = MakeIdentifier(); auto catalog = std::make_shared>(); @@ -159,7 +160,7 @@ TEST(TransactionPendingUpdateTest, AggregatesMultiplePendingUpdates) { EXPECT_NE(remove_update, nullptr); EXPECT_THAT(remove_update->removed(), ElementsAre("existing")); - return Result>(std::unique_ptr
()); + return {std::unique_ptr
()}; }); EXPECT_THAT(transaction->CommitTransaction(), IsOk()); From 0b5647dea369013d50220eefc469206727995715 Mon Sep 17 00:00:00 2001 From: "shuxu.li" Date: Sat, 6 Dec 2025 21:37:09 +0800 Subject: [PATCH 06/20] feat: table transactional UpdateProperties support --- src/iceberg/CMakeLists.txt | 2 +- src/iceberg/base_transaction.cc | 113 +++++++++++---- src/iceberg/base_transaction.h | 62 ++++++-- src/iceberg/catalog.h | 6 +- .../catalog/memory/in_memory_catalog.cc | 5 +- .../catalog/memory/in_memory_catalog.h | 8 +- src/iceberg/catalog/rest/rest_catalog.cc | 5 +- src/iceberg/catalog/rest/rest_catalog.h | 6 +- src/iceberg/pending_update.cc | 63 -------- src/iceberg/pending_update.h | 15 -- src/iceberg/result.h | 2 + src/iceberg/table_update.cc | 10 +- src/iceberg/test/CMakeLists.txt | 4 +- src/iceberg/test/base_transaction_test.cc | 124 +++++++--------- src/iceberg/test/mock_catalog.h | 8 +- src/iceberg/test/pending_update_test.cc | 103 -------------- src/iceberg/test/update_properties_test.cc | 8 +- src/iceberg/transaction.h | 4 +- src/iceberg/transaction_catalog.cc | 43 ++++++ src/iceberg/transaction_catalog.h | 134 ++++++++++++++++++ src/iceberg/update/update_properties.cc | 5 +- 21 files changed, 417 insertions(+), 313 deletions(-) delete mode 100644 src/iceberg/pending_update.cc delete mode 100644 src/iceberg/test/pending_update_test.cc create mode 100644 src/iceberg/transaction_catalog.cc create mode 100644 src/iceberg/transaction_catalog.h diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 30508ebde..ac2a5ca74 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -54,7 +54,6 @@ set(ICEBERG_SOURCES partition_field.cc partition_spec.cc partition_summary.cc - pending_update.cc row/arrow_array_wrapper.cc row/manifest_wrapper.cc row/partition_values.cc @@ -70,6 +69,7 @@ set(ICEBERG_SOURCES table.cc table_metadata.cc table_properties.cc + transaction_catalog.cc table_requirement.cc table_requirements.cc table_scan.cc diff --git a/src/iceberg/base_transaction.cc b/src/iceberg/base_transaction.cc index f22e1f090..78553298b 100644 --- a/src/iceberg/base_transaction.cc +++ b/src/iceberg/base_transaction.cc @@ -19,63 +19,126 @@ #include "iceberg/base_transaction.h" +#include + #include "iceberg/catalog.h" #include "iceberg/pending_update.h" #include "iceberg/table.h" -#include "iceberg/table_metadata.h" -#include "iceberg/table_requirements.h" -#include "iceberg/table_update.h" +#include "iceberg/update/update_properties.h" +#include "iceberg/util/macros.h" namespace iceberg { BaseTransaction::BaseTransaction(std::shared_ptr table, std::shared_ptr catalog) - : table_(std::move(table)), catalog_(std::move(catalog)) { + : table_(std::move(table)) { ICEBERG_DCHECK(table_ != nullptr, "table must not be null"); - ICEBERG_DCHECK(catalog_ != nullptr, "catalog must not be null"); + ICEBERG_DCHECK(catalog != nullptr, "catalog must not be null"); + context_.identifier = table_->name(); + context_.current_metadata = table_->metadata(); + catalog_ = std::make_shared(std::move(catalog), this); } const std::shared_ptr& BaseTransaction::table() const { return table_; } -std::shared_ptr BaseTransaction::UpdateProperties() { - return RegisterUpdate(); +std::unique_ptr BaseTransaction::UpdateProperties() { + auto update = CheckAndCreateUpdate<::iceberg::UpdateProperties>( + table_->name(), catalog_, CurrentMetadata()); + if (!update.has_value()) { + ERROR_TO_EXCEPTION(update.error()); + } + + return std::move(update).value(); } -std::shared_ptr BaseTransaction::NewAppend() { +std::unique_ptr BaseTransaction::NewAppend() { throw NotImplemented("BaseTransaction::NewAppend not implemented"); } Status BaseTransaction::CommitTransaction() { - const auto& metadata = table_->metadata(); - if (!metadata) { - return InvalidArgument("Table metadata is null"); + if (!HasLastOperationCommitted()) { + return InvalidState("Cannot commit transaction: last operation has not committed"); } - auto builder = TableMetadataBuilder::BuildFrom(metadata.get()); - for (const auto& pending_update : pending_updates_) { - if (!pending_update) { - continue; - } - ICEBERG_RETURN_UNEXPECTED(pending_update->Apply(*builder)); + auto pending_updates = ConsumePendingUpdates(); + if (pending_updates.empty()) { + return {}; } - auto table_updates = builder->GetChanges(); - TableUpdateContext context(metadata.get(), /*is_replace=*/false); - for (const auto& update : table_updates) { - ICEBERG_RETURN_UNEXPECTED(update->GenerateRequirements(context)); - } - ICEBERG_ASSIGN_OR_RAISE(auto table_requirements, context.Build()); + auto pending_requirements = ConsumePendingRequirements(); ICEBERG_ASSIGN_OR_RAISE( auto updated_table, - catalog_->UpdateTable(table_->name(), table_requirements, table_updates)); + catalog_->catalog_impl()->UpdateTable( + table_->name(), std::move(pending_requirements), std::move(pending_updates))); + // update table to the new version if (updated_table) { table_ = std::shared_ptr
(std::move(updated_table)); } - pending_updates_.clear(); return {}; } +Result> BaseTransaction::StageUpdates( + const TableIdentifier& identifier, + std::vector> requirements, + std::vector> updates) { + if (identifier != context_.identifier) { + return InvalidArgument("Transaction only supports table '{}'", + context_.identifier.name); + } + + if (!context_.current_metadata) { + return InvalidState("Transaction metadata is not initialized"); + } + + if (updates.empty()) { + return std::make_unique
( + context_.identifier, std::make_shared(*context_.current_metadata), + table_->location(), table_->io(), catalog_->catalog_impl()); + } + + ICEBERG_RETURN_UNEXPECTED(ApplyUpdates(updates)); + + for (auto& requirement : requirements) { + context_.pending_requirements.emplace_back(std::move(requirement)); + } + for (auto& update : updates) { + context_.pending_updates.emplace_back(std::move(update)); + } + + return std::make_unique
( + context_.identifier, std::make_shared(*context_.current_metadata), + table_->location(), table_->io(), catalog_->catalog_impl()); +} + +Status BaseTransaction::ApplyUpdates( + const std::vector>& updates) { + if (updates.empty()) { + return {}; + } + + auto builder = TableMetadataBuilder::BuildFrom(context_.current_metadata.get()); + for (const auto& update : updates) { + if (!update) { + continue; + } + update->ApplyTo(*builder); + } + + ICEBERG_ASSIGN_OR_RAISE(auto new_metadata, builder->Build()); + context_.current_metadata = std::shared_ptr(std::move(new_metadata)); + return {}; +} + +std::vector> +BaseTransaction::ConsumePendingRequirements() { + return std::exchange(context_.pending_requirements, {}); +} + +std::vector> BaseTransaction::ConsumePendingUpdates() { + return std::exchange(context_.pending_updates, {}); +} + } // namespace iceberg diff --git a/src/iceberg/base_transaction.h b/src/iceberg/base_transaction.h index cab21753d..c19ed2bd9 100644 --- a/src/iceberg/base_transaction.h +++ b/src/iceberg/base_transaction.h @@ -19,38 +19,82 @@ #pragma once +#include #include +#include "iceberg/table_identifier.h" +#include "iceberg/table_metadata.h" +#include "iceberg/table_requirement.h" +#include "iceberg/table_update.h" #include "iceberg/transaction.h" +#include "iceberg/transaction_catalog.h" #include "iceberg/type_fwd.h" namespace iceberg { /// \brief Base class for transaction implementations -class BaseTransaction : public Transaction { +class ICEBERG_EXPORT BaseTransaction : public Transaction { public: BaseTransaction(std::shared_ptr table, std::shared_ptr catalog); ~BaseTransaction() override = default; const std::shared_ptr& table() const override; - std::shared_ptr UpdateProperties() override; + std::unique_ptr<::iceberg::UpdateProperties> UpdateProperties() override; - std::shared_ptr NewAppend() override; + std::unique_ptr NewAppend() override; Status CommitTransaction() override; + Result> StageUpdates( + const TableIdentifier& identifier, + std::vector> requirements, + std::vector> updates); + + bool HasLastOperationCommitted() const { return context_.last_operation_committed; } + + void SetLastOperationCommitted(bool committed) { + context_.last_operation_committed = committed; + } + + const std::shared_ptr& CurrentMetadata() const { + return context_.current_metadata; + } + + Status ApplyUpdates(const std::vector>& updates); + + std::vector> ConsumePendingRequirements(); + + std::vector> ConsumePendingUpdates(); + protected: template - std::shared_ptr RegisterUpdate(Args&&... args) { - auto update = std::make_shared(std::forward(args)...); - pending_updates_.push_back(update); - return update; + Result> CheckAndCreateUpdate(Args&&... args) { + if (!HasLastOperationCommitted()) { + return InvalidState( + "Cannot create new update: last operation in transaction has not committed"); + } + SetLastOperationCommitted(false); + return std::make_unique(std::forward(args)...); } + private: + struct TransactionContext { + TransactionContext() = default; + TransactionContext(const TableIdentifier& identifier, + std::shared_ptr metadata) + : identifier(identifier), current_metadata(std::move(metadata)) {} + + bool last_operation_committed = true; + TableIdentifier identifier; + std::shared_ptr current_metadata; + std::vector> pending_requirements; + std::vector> pending_updates; + }; + std::shared_ptr table_; - std::shared_ptr catalog_; - std::vector> pending_updates_; + std::shared_ptr catalog_; + TransactionContext context_; }; } // namespace iceberg diff --git a/src/iceberg/catalog.h b/src/iceberg/catalog.h index 6c4957ade..ee7003655 100644 --- a/src/iceberg/catalog.h +++ b/src/iceberg/catalog.h @@ -123,8 +123,8 @@ class ICEBERG_EXPORT Catalog { /// \return a Table instance or ErrorKind::kAlreadyExists if the table already exists virtual Result> UpdateTable( const TableIdentifier& identifier, - const std::vector>& requirements, - const std::vector>& updates) = 0; + const std::vector>& requirements, + const std::vector>& updates) = 0; /// \brief Start a transaction to create a table /// @@ -184,6 +184,8 @@ class ICEBERG_EXPORT Catalog { /// \return a Table instance or ErrorKind::kAlreadyExists if the table already exists virtual Result> RegisterTable( const TableIdentifier& identifier, const std::string& metadata_file_location) = 0; + + virtual void SetLastOperationCommitted(bool committed) = 0; }; } // namespace iceberg diff --git a/src/iceberg/catalog/memory/in_memory_catalog.cc b/src/iceberg/catalog/memory/in_memory_catalog.cc index 9e4a485a0..645ee43eb 100644 --- a/src/iceberg/catalog/memory/in_memory_catalog.cc +++ b/src/iceberg/catalog/memory/in_memory_catalog.cc @@ -392,9 +392,8 @@ Result> InMemoryCatalog::CreateTable( Result> InMemoryCatalog::UpdateTable( const TableIdentifier& identifier, - const std::vector>& requirements, - const std::vector>& updates) { - std::unique_lock lock(mutex_); + const std::vector>& requirements, + const std::vector>& updates) { return NotImplemented("update table"); } diff --git a/src/iceberg/catalog/memory/in_memory_catalog.h b/src/iceberg/catalog/memory/in_memory_catalog.h index e6a9acbce..ec9a923cf 100644 --- a/src/iceberg/catalog/memory/in_memory_catalog.h +++ b/src/iceberg/catalog/memory/in_memory_catalog.h @@ -22,6 +22,8 @@ #include #include "iceberg/catalog.h" +#include "iceberg/table_requirement.h" +#include "iceberg/table_update.h" namespace iceberg { @@ -77,8 +79,8 @@ class ICEBERG_EXPORT InMemoryCatalog Result> UpdateTable( const TableIdentifier& identifier, - const std::vector>& requirements, - const std::vector>& updates) override; + const std::vector>& requirements, + const std::vector>& updates) override; Result> StageCreateTable( const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec, @@ -97,6 +99,8 @@ class ICEBERG_EXPORT InMemoryCatalog const TableIdentifier& identifier, const std::string& metadata_file_location) override; + void SetLastOperationCommitted(bool committed) override {} + private: std::string catalog_name_; std::unordered_map properties_; diff --git a/src/iceberg/catalog/rest/rest_catalog.cc b/src/iceberg/catalog/rest/rest_catalog.cc index 4a77f6585..54828479a 100644 --- a/src/iceberg/catalog/rest/rest_catalog.cc +++ b/src/iceberg/catalog/rest/rest_catalog.cc @@ -197,8 +197,9 @@ Result> RestCatalog::CreateTable( Result> RestCatalog::UpdateTable( [[maybe_unused]] const TableIdentifier& identifier, - [[maybe_unused]] const std::vector>& requirements, - [[maybe_unused]] const std::vector>& updates) { + [[maybe_unused]] const std::vector>& + requirements, + [[maybe_unused]] const std::vector>& updates) { return NotImplemented("Not implemented"); } diff --git a/src/iceberg/catalog/rest/rest_catalog.h b/src/iceberg/catalog/rest/rest_catalog.h index 4e191e86f..e3ae95df9 100644 --- a/src/iceberg/catalog/rest/rest_catalog.h +++ b/src/iceberg/catalog/rest/rest_catalog.h @@ -76,8 +76,8 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog { Result> UpdateTable( const TableIdentifier& identifier, - const std::vector>& requirements, - const std::vector>& updates) override; + const std::vector>& requirements, + const std::vector>& updates) override; Result> StageCreateTable( const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec, @@ -96,6 +96,8 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog { const TableIdentifier& identifier, const std::string& metadata_file_location) override; + void SetLastOperationCommitted(bool committed) override {} + private: RestCatalog(std::unique_ptr config, std::unique_ptr paths); diff --git a/src/iceberg/pending_update.cc b/src/iceberg/pending_update.cc deleted file mode 100644 index 3e17deb20..000000000 --- a/src/iceberg/pending_update.cc +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "iceberg/pending_update.h" - -#include "iceberg/catalog.h" -#include "iceberg/table.h" -#include "iceberg/table_metadata.h" -#include "iceberg/table_update.h" -#include "iceberg/util/macros.h" - -namespace iceberg { - -PropertiesUpdate& PropertiesUpdate::Set(std::string const& key, - std::string const& value) { - updates_[key] = value; - return *this; -} - -PropertiesUpdate& PropertiesUpdate::Remove(std::string const& key) { - removals_.push_back(key); - return *this; -} - -Result PropertiesUpdate::Apply() { - return PropertiesUpdateChanges{ - .updates = updates_, - .removals = removals_, - }; -} - -Status PropertiesUpdate::ApplyResult(TableMetadataBuilder& builder, - PropertiesUpdateChanges result) { - if (!result.updates.empty()) { - builder.SetProperties(result.updates); - } - if (!result.removals.empty()) { - builder.RemoveProperties(result.removals); - } - return {}; -} - -Status PropertiesUpdate::Commit() { - return NotImplemented("UpdateProperties::Commit() not implemented"); -} - -} // namespace iceberg diff --git a/src/iceberg/pending_update.h b/src/iceberg/pending_update.h index 5c5cf784a..8370db141 100644 --- a/src/iceberg/pending_update.h +++ b/src/iceberg/pending_update.h @@ -22,14 +22,10 @@ /// \file iceberg/pending_update.h /// API for table changes using builder pattern -#include -#include - #include "iceberg/iceberg_export.h" #include "iceberg/result.h" #include "iceberg/type_fwd.h" #include "iceberg/util/error_collector.h" -#include "iceberg/util/macros.h" namespace iceberg { @@ -71,17 +67,6 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector { protected: PendingUpdate() = default; - - /// \brief Apply the pending changes to a TableMetadataBuilder - /// - /// This method applies the changes by calling builder's specific methods. - /// The builder will automatically record corresponding TableUpdate objects. - /// - /// \param builder The TableMetadataBuilder to apply changes to - /// \return Status::OK if the changes were applied successfully, or an error - virtual Status Apply(TableMetadataBuilder& builder) = 0; - - friend class BaseTransaction; }; } // namespace iceberg diff --git a/src/iceberg/result.h b/src/iceberg/result.h index ddc428a23..6f93dc606 100644 --- a/src/iceberg/result.h +++ b/src/iceberg/result.h @@ -43,6 +43,7 @@ enum class ErrorKind { kInvalidManifest, kInvalidManifestList, kInvalidSchema, + kInvalidState, kIOError, kJsonParseError, kNamespaceNotEmpty, @@ -104,6 +105,7 @@ DEFINE_ERROR_FUNCTION(InvalidExpression) DEFINE_ERROR_FUNCTION(InvalidManifest) DEFINE_ERROR_FUNCTION(InvalidManifestList) DEFINE_ERROR_FUNCTION(InvalidSchema) +DEFINE_ERROR_FUNCTION(InvalidState) DEFINE_ERROR_FUNCTION(IOError) DEFINE_ERROR_FUNCTION(JsonParseError) DEFINE_ERROR_FUNCTION(NamespaceNotEmpty) diff --git a/src/iceberg/table_update.cc b/src/iceberg/table_update.cc index ee5f9d67e..90f7de622 100644 --- a/src/iceberg/table_update.cc +++ b/src/iceberg/table_update.cc @@ -180,9 +180,8 @@ void SetProperties::ApplyTo(TableMetadataBuilder& builder) const { builder.SetProperties(updated_); } -Status SetProperties::GenerateRequirements(TableUpdateContext& context) const { - // No requirements - return {}; +void SetProperties::GenerateRequirements(TableUpdateContext& context) const { + // SetProperties doesn't generate any requirements } // RemoveProperties @@ -191,9 +190,8 @@ void RemoveProperties::ApplyTo(TableMetadataBuilder& builder) const { builder.RemoveProperties(removed_); } -Status RemoveProperties::GenerateRequirements(TableUpdateContext& context) const { - // No requirements - return {}; +void RemoveProperties::GenerateRequirements(TableUpdateContext& context) const { + // RemoveProperties doesn't generate any requirements } // SetLocation diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 32e5fd059..d4957ce35 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -69,17 +69,15 @@ add_iceberg_test(schema_test add_iceberg_test(table_test SOURCES + base_transaction_test.cc json_internal_test.cc metrics_config_test.cc - base_transaction_test.cc - pending_update_test.cc schema_json_test.cc table_test.cc table_metadata_builder_test.cc table_requirement_test.cc table_requirements_test.cc table_update_test.cc - transaction_pending_update_test.cc update_properties_test.cc) add_iceberg_test(expression_test diff --git a/src/iceberg/test/base_transaction_test.cc b/src/iceberg/test/base_transaction_test.cc index ff50730d9..32d8597f9 100644 --- a/src/iceberg/test/base_transaction_test.cc +++ b/src/iceberg/test/base_transaction_test.cc @@ -22,45 +22,16 @@ #include #include "iceberg/partition_spec.h" -#include "iceberg/pending_update.h" -#include "iceberg/sort_order.h" +#include "iceberg/result.h" #include "iceberg/table.h" #include "iceberg/table_metadata.h" #include "iceberg/table_update.h" #include "iceberg/test/matchers.h" #include "iceberg/test/mock_catalog.h" +#include "iceberg/update/update_properties.h" namespace iceberg { namespace { -using ::testing::ElementsAre; -using ::testing::NiceMock; - -TableIdentifier MakeIdentifier() { - return TableIdentifier{ - .ns = Namespace{.levels = {"test_ns"}}, - .name = "test_table", - }; -} - -std::shared_ptr CreateBaseMetadata() { - auto metadata = std::make_shared(); - metadata->format_version = TableMetadata::kDefaultTableFormatVersion; - metadata->table_uuid = "test-uuid"; - metadata->location = "s3://bucket/table"; - metadata->last_sequence_number = TableMetadata::kInitialSequenceNumber; - metadata->last_updated_ms = - TimePointMs{std::chrono::duration_cast( - std::chrono::system_clock::now().time_since_epoch())}; - metadata->last_column_id = 0; - metadata->default_spec_id = PartitionSpec::kInitialSpecId; - metadata->last_partition_id = 0; - metadata->current_snapshot_id = Snapshot::kInvalidSnapshotId; - metadata->default_sort_order_id = SortOrder::kInitialSortOrderId; - metadata->next_row_id = TableMetadata::kInitialRowId; - metadata->properties = {{"existing", "value"}}; - return metadata; -} - std::shared_ptr
CreateTestTable(const TableIdentifier& identifier, const std::shared_ptr& metadata, const std::shared_ptr& catalog) { @@ -69,21 +40,35 @@ std::shared_ptr
CreateTestTable(const TableIdentifier& identifier, } } // namespace -TEST(BaseTransactionTest, CommitSetPropertiesUsesCatalog) { - auto metadata = CreateBaseMetadata(); - const auto identifier = MakeIdentifier(); - auto catalog = std::make_shared>(); - auto table = - CreateTestTable(identifier, std::make_shared(*metadata), catalog); - auto transaction = table->NewTransaction(); +class BaseTransactionTest : public ::testing::Test { + protected: + void SetUp() override { + // Create catalog and table identifier + catalog_ = std::make_shared<::testing::NiceMock>(); + + identifier_ = TableIdentifier(Namespace({"test"}), "test_table"); + auto metadata = std::make_shared(); + table_ = + std::make_shared
(identifier_, std::move(metadata), + "s3://bucket/table/metadata.json", nullptr, catalog_); + } + + TableIdentifier identifier_; + std::shared_ptr catalog_; + std::shared_ptr
table_; +}; + +TEST_F(BaseTransactionTest, CommitSetPropertiesUsesCatalog) { + auto transaction = table_->NewTransaction(); auto update_properties = transaction->UpdateProperties(); update_properties->Set("new-key", "new-value"); + EXPECT_THAT(update_properties->Commit(), IsOk()); - EXPECT_CALL(*catalog, - UpdateTable(::testing::Eq(identifier), ::testing::_, ::testing::_)) + EXPECT_CALL(*catalog_, + UpdateTable(::testing::Eq(identifier_), ::testing::_, ::testing::_)) .WillOnce([](const TableIdentifier& id, - const std::vector>& /*requirements*/, - const std::vector>& updates) + std::vector> /*requirements*/, + std::vector> updates) -> Result> { EXPECT_EQ("test_table", id.name); EXPECT_EQ(1u, updates.size()); @@ -100,50 +85,44 @@ TEST(BaseTransactionTest, CommitSetPropertiesUsesCatalog) { EXPECT_THAT(transaction->CommitTransaction(), IsOk()); } -TEST(BaseTransactionTest, RemovePropertiesSkipsMissingKeys) { - auto metadata = CreateBaseMetadata(); - const auto identifier = MakeIdentifier(); - auto catalog = std::make_shared>(); - auto table = - CreateTestTable(identifier, std::make_shared(*metadata), catalog); - auto transaction = table->NewTransaction(); - +TEST_F(BaseTransactionTest, RemovePropertiesSkipsMissingKeys) { + auto transaction = table_->NewTransaction(); auto update_properties = transaction->UpdateProperties(); update_properties->Remove("missing").Remove("existing"); + EXPECT_THAT(update_properties->Commit(), IsOk()); - EXPECT_CALL(*catalog, - UpdateTable(::testing::Eq(identifier), ::testing::_, ::testing::_)) + EXPECT_CALL(*catalog_, + UpdateTable(::testing::Eq(identifier_), ::testing::_, ::testing::_)) .WillOnce([](const TableIdentifier&, - const std::vector>& /*requirements*/, - const std::vector>& updates) + std::vector> /*requirements*/, + std::vector> updates) -> Result> { EXPECT_EQ(1u, updates.size()); const auto* remove_update = dynamic_cast(updates.front().get()); EXPECT_NE(remove_update, nullptr); - EXPECT_THAT(remove_update->removed(), ElementsAre("existing")); + EXPECT_THAT(remove_update->removed(), + ::testing::ElementsAre("existing", "missing")); return {std::unique_ptr
()}; }); EXPECT_THAT(transaction->CommitTransaction(), IsOk()); } -TEST(BaseTransactionTest, AggregatesMultiplePendingUpdates) { - auto metadata = CreateBaseMetadata(); - const auto identifier = MakeIdentifier(); - auto catalog = std::make_shared>(); - auto table = - CreateTestTable(identifier, std::make_shared(*metadata), catalog); - auto transaction = table->NewTransaction(); - +TEST_F(BaseTransactionTest, AggregatesMultiplePendingUpdates) { + auto transaction = table_->NewTransaction(); auto update_properties = transaction->UpdateProperties(); - update_properties->Set("new-key", "new-value").Remove("existing"); + update_properties->Set("new-key", "new-value"); + EXPECT_THAT(update_properties->Commit(), IsOk()); + auto remove_properties = transaction->UpdateProperties(); + remove_properties->Remove("existing"); + EXPECT_THAT(remove_properties->Commit(), IsOk()); - EXPECT_CALL(*catalog, - UpdateTable(::testing::Eq(identifier), ::testing::_, ::testing::_)) + EXPECT_CALL(*catalog_, + UpdateTable(::testing::Eq(identifier_), ::testing::_, ::testing::_)) .WillOnce([](const TableIdentifier&, - const std::vector>& /*requirements*/, - const std::vector>& updates) + std::vector> /*requirements*/, + std::vector> updates) -> Result> { EXPECT_EQ(2u, updates.size()); @@ -158,7 +137,7 @@ TEST(BaseTransactionTest, AggregatesMultiplePendingUpdates) { const auto* remove_update = dynamic_cast(updates[1].get()); EXPECT_NE(remove_update, nullptr); - EXPECT_THAT(remove_update->removed(), ElementsAre("existing")); + EXPECT_THAT(remove_update->removed(), ::testing::ElementsAre("existing")); return {std::unique_ptr
()}; }); @@ -166,4 +145,11 @@ TEST(BaseTransactionTest, AggregatesMultiplePendingUpdates) { EXPECT_THAT(transaction->CommitTransaction(), IsOk()); } +TEST_F(BaseTransactionTest, FailsIfUpdateNotCommitted) { + auto transaction = table_->NewTransaction(); + auto update_properties = transaction->UpdateProperties(); + update_properties->Set("new-key", "new-value"); + EXPECT_THAT(transaction->CommitTransaction(), IsError(ErrorKind::kInvalidState)); +} + } // namespace iceberg diff --git a/src/iceberg/test/mock_catalog.h b/src/iceberg/test/mock_catalog.h index 46f01c8db..ca89dd8ed 100644 --- a/src/iceberg/test/mock_catalog.h +++ b/src/iceberg/test/mock_catalog.h @@ -23,6 +23,8 @@ #include #include "iceberg/catalog.h" +#include "iceberg/table_requirement.h" +#include "iceberg/table_update.h" namespace iceberg { @@ -62,8 +64,8 @@ class MockCatalog : public Catalog { MOCK_METHOD((Result>), UpdateTable, (const TableIdentifier&, - (const std::vector>&), - (const std::vector>&)), + const std::vector>&, + const std::vector>&), (override)); MOCK_METHOD((Result>), StageCreateTable, @@ -83,6 +85,8 @@ class MockCatalog : public Catalog { MOCK_METHOD((Result>), RegisterTable, (const TableIdentifier&, const std::string&), (override)); + + MOCK_METHOD(void, SetLastOperationCommitted, (bool), (override)); }; } // namespace iceberg diff --git a/src/iceberg/test/pending_update_test.cc b/src/iceberg/test/pending_update_test.cc deleted file mode 100644 index 2def1f011..000000000 --- a/src/iceberg/test/pending_update_test.cc +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "iceberg/pending_update.h" - -#include - -#include "iceberg/result.h" -#include "iceberg/test/matchers.h" - -namespace iceberg { - -// Mock implementation for testing the interface -class MockSnapshot {}; - -class MockPendingUpdate : public PendingUpdateTyped { - public: - MockPendingUpdate() = default; - - Result Apply() override { - if (should_fail_) { - return ValidationFailed("Mock validation failed"); - } - apply_called_ = true; - return MockSnapshot{}; - } - - Status ApplyResult(TableMetadataBuilder& builder, MockSnapshot result) override { - return {}; - } - - Status Commit() override { - if (should_fail_commit_) { - return CommitFailed("Mock commit failed"); - } - commit_called_ = true; - return {}; - } - - void SetShouldFail(bool fail) { should_fail_ = fail; } - void SetShouldFailCommit(bool fail) { should_fail_commit_ = fail; } - bool ApplyCalled() const { return apply_called_; } - bool CommitCalled() const { return commit_called_; } - - private: - bool should_fail_ = false; - bool should_fail_commit_ = false; - bool apply_called_ = false; - bool commit_called_ = false; -}; - -TEST(PendingUpdateTest, ApplySuccess) { - MockPendingUpdate update; - auto result = update.Apply(); - EXPECT_THAT(result, IsOk()); -} - -TEST(PendingUpdateTest, ApplyValidationFailed) { - MockPendingUpdate update; - update.SetShouldFail(true); - auto result = update.Apply(); - EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); - EXPECT_THAT(result, HasErrorMessage("Mock validation failed")); -} - -TEST(PendingUpdateTest, CommitSuccess) { - MockPendingUpdate update; - auto status = update.Commit(); - EXPECT_THAT(status, IsOk()); - EXPECT_TRUE(update.CommitCalled()); -} - -TEST(PendingUpdateTest, CommitFailed) { - MockPendingUpdate update; - update.SetShouldFailCommit(true); - auto status = update.Commit(); - EXPECT_THAT(status, IsError(ErrorKind::kCommitFailed)); - EXPECT_THAT(status, HasErrorMessage("Mock commit failed")); -} - -TEST(PendingUpdateTest, BaseClassPolymorphism) { - std::unique_ptr base_ptr = std::make_unique(); - auto status = base_ptr->Commit(); - EXPECT_THAT(status, IsOk()); -} - -} // namespace iceberg diff --git a/src/iceberg/test/update_properties_test.cc b/src/iceberg/test/update_properties_test.cc index 13cfec831..6517be2b3 100644 --- a/src/iceberg/test/update_properties_test.cc +++ b/src/iceberg/test/update_properties_test.cc @@ -50,7 +50,7 @@ class UpdatePropertiesTest : public ::testing::Test { metadata_->schemas.push_back(schema_); // Create catalog and table identifier - catalog_ = std::make_shared(); + catalog_ = std::make_shared<::testing::NiceMock>(); identifier_ = TableIdentifier(Namespace({"test"}), "table"); } @@ -173,7 +173,9 @@ TEST_F(UpdatePropertiesTest, Commit) { UpdateProperties update(identifier_, catalog_, metadata_); update.Set("key1", "value1"); - EXPECT_CALL(*catalog_, UpdateTable).Times(1).WillOnce(::testing::Return(nullptr)); + EXPECT_CALL(*catalog_, UpdateTable(::testing::_, ::testing::_, ::testing::_)) + .Times(1) + .WillOnce(::testing::Return(nullptr)); auto result = update.Commit(); EXPECT_THAT(result, IsOk()); @@ -184,7 +186,7 @@ TEST_F(UpdatePropertiesTest, Commit) { UpdateProperties update(identifier_, catalog_, metadata_); update.Set("key1", "value1"); - EXPECT_CALL(*catalog_, UpdateTable) + EXPECT_CALL(*catalog_, UpdateTable(::testing::_, ::testing::_, ::testing::_)) .WillOnce(::testing::Return(CommitFailed("Commit update failed"))); auto result = update.Commit(); EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed)); diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index 7b54b65ea..f462345af 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -41,12 +41,12 @@ class ICEBERG_EXPORT Transaction { /// \brief Create a new update properties operation /// /// \return a new UpdateProperties - virtual std::shared_ptr UpdateProperties() = 0; + virtual std::unique_ptr UpdateProperties() = 0; /// \brief Create a new append API to add files to this table /// /// \return a new AppendFiles - virtual std::shared_ptr NewAppend() = 0; + virtual std::unique_ptr NewAppend() = 0; /// \brief Apply the pending changes from all actions and commit /// diff --git a/src/iceberg/transaction_catalog.cc b/src/iceberg/transaction_catalog.cc new file mode 100644 index 000000000..7f1ebe4dd --- /dev/null +++ b/src/iceberg/transaction_catalog.cc @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/transaction_catalog.h" + +#include "iceberg/base_transaction.h" + +namespace iceberg { + +Result> TransactionCatalog::UpdateTable( + const TableIdentifier& identifier, + std::vector> requirements, + std::vector> updates) { + if (!owner_) { + return InvalidState("Transaction state is unavailable"); + } + + return owner_->StageUpdates(identifier, std::move(requirements), std::move(updates)); +} + +void TransactionCatalog::SetLastOperationCommitted(bool committed) { + if (owner_) { + owner_->SetLastOperationCommitted(committed); + } +} + +} // namespace iceberg diff --git a/src/iceberg/transaction_catalog.h b/src/iceberg/transaction_catalog.h new file mode 100644 index 000000000..b14e58293 --- /dev/null +++ b/src/iceberg/transaction_catalog.h @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +#include "iceberg/catalog.h" +#include "iceberg/result.h" +#include "iceberg/table.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +class BaseTransaction; + +/** + * @brief Lightweight catalog wrapper for BaseTransaction. + * + * For read-only operations, TransactionCatalog simply forwards to the wrapped catalog. + * For mutating calls such as UpdateTable or HasLastOperationCommitted, it delegates back + * to the owning BaseTransaction so staged updates remain private until commit. + */ +class ICEBERG_EXPORT TransactionCatalog : public Catalog { + public: + TransactionCatalog(std::shared_ptr catalog, BaseTransaction* owner) + : catalog_impl_(std::move(catalog)), owner_(owner) {} + ~TransactionCatalog() override = default; + + std::string_view name() const override { return catalog_impl_->name(); }; + + Status CreateNamespace( + const Namespace& ns, + const std::unordered_map& properties) override { + return catalog_impl_->CreateNamespace(ns, properties); + } + + Result> ListNamespaces(const Namespace& ns) const override { + return catalog_impl_->ListNamespaces(ns); + } + + Result> GetNamespaceProperties( + const Namespace& ns) const override { + return catalog_impl_->GetNamespaceProperties(ns); + } + + Status DropNamespace(const Namespace& ns) override { + // Will do nothing for directly dropping namespaces. + return NotSupported("DropNamespace is not supported in TransactionCatalog."); + } + + Result NamespaceExists(const Namespace& ns) const override { + return catalog_impl_->NamespaceExists(ns); + } + + Status UpdateNamespaceProperties( + const Namespace& ns, const std::unordered_map& updates, + const std::unordered_set& removals) override { + // Will do nothing for directly updating namespace properties. + return NotSupported( + "UpdateNamespaceProperties is not supported in TransactionCatalog."); + } + + Result> ListTables(const Namespace& ns) const override { + return catalog_impl_->ListTables(ns); + } + + Result> CreateTable( + const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec, + const std::string& location, + const std::unordered_map& properties) override { + return NotImplemented("CreateTable is not implemented in TransactionCatalog."); + } + + Result> UpdateTable( + const TableIdentifier& identifier, + std::vector> requirements, + std::vector> updates) override; + + Result> StageCreateTable( + const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec, + const std::string& location, + const std::unordered_map& properties) override { + return NotImplemented("StageCreateTable is not implemented in TransactionCatalog."); + } + + Result TableExists(const TableIdentifier& identifier) const override { + return catalog_impl_->TableExists(identifier); + } + + Status DropTable(const TableIdentifier& identifier, bool purge) override { + return NotSupported("DropTable is not supported in TransactionCatalog."); + } + + Status RenameTable(const TableIdentifier& from, const TableIdentifier& to) override { + return NotImplemented("rename table"); + } + + Result> LoadTable(const TableIdentifier& identifier) override { + return catalog_impl_->LoadTable(identifier); + } + + Result> RegisterTable( + const TableIdentifier& identifier, + const std::string& metadata_file_location) override { + return NotImplemented("register table"); + } + + void SetLastOperationCommitted(bool committed) override; + + const std::shared_ptr& catalog_impl() const { return catalog_impl_; } + + private: + std::shared_ptr catalog_impl_; + BaseTransaction* owner_; +}; + +} // namespace iceberg diff --git a/src/iceberg/update/update_properties.cc b/src/iceberg/update/update_properties.cc index a4dcd1548..76261da42 100644 --- a/src/iceberg/update/update_properties.cc +++ b/src/iceberg/update/update_properties.cc @@ -126,8 +126,11 @@ Status UpdateProperties::Commit() { if (!updates.empty()) { ICEBERG_ASSIGN_OR_RAISE(auto requirements, TableRequirements::ForUpdateTable(*base_metadata_, updates)); - ICEBERG_RETURN_UNEXPECTED(catalog_->UpdateTable(identifier_, requirements, updates)); + ICEBERG_RETURN_UNEXPECTED( + catalog_->UpdateTable(identifier_, std::move(requirements), std::move(updates))); } + + catalog_->SetLastOperationCommitted(true); return {}; } From 492bbed3b67d7421961e086113f6e1fd427f7cf5 Mon Sep 17 00:00:00 2001 From: "shuxu.li" Date: Sat, 6 Dec 2025 22:26:58 +0800 Subject: [PATCH 07/20] feat: transactional UpdateProperties method support --- src/iceberg/CMakeLists.txt | 2 +- src/iceberg/base_transaction.cc | 2 ++ src/iceberg/base_transaction.h | 2 -- src/iceberg/catalog.h | 3 +++ src/iceberg/catalog/memory/in_memory_catalog.h | 2 -- src/iceberg/meson.build | 2 +- src/iceberg/table.cc | 2 -- src/iceberg/table_metadata.cc | 13 ------------- src/iceberg/table_metadata.h | 17 ----------------- src/iceberg/transaction.h | 2 +- src/iceberg/transaction_catalog.h | 2 -- src/iceberg/type_fwd.h | 6 +++--- 12 files changed, 11 insertions(+), 44 deletions(-) diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index ac2a5ca74..5576d9c9f 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -69,11 +69,11 @@ set(ICEBERG_SOURCES table.cc table_metadata.cc table_properties.cc - transaction_catalog.cc table_requirement.cc table_requirements.cc table_scan.cc table_update.cc + transaction_catalog.cc transform.cc transform_function.cc type.cc diff --git a/src/iceberg/base_transaction.cc b/src/iceberg/base_transaction.cc index 78553298b..fd58432bf 100644 --- a/src/iceberg/base_transaction.cc +++ b/src/iceberg/base_transaction.cc @@ -24,6 +24,8 @@ #include "iceberg/catalog.h" #include "iceberg/pending_update.h" #include "iceberg/table.h" +#include "iceberg/table_metadata.h" +#include "iceberg/transaction_catalog.h" #include "iceberg/update/update_properties.h" #include "iceberg/util/macros.h" diff --git a/src/iceberg/base_transaction.h b/src/iceberg/base_transaction.h index c19ed2bd9..5c2604429 100644 --- a/src/iceberg/base_transaction.h +++ b/src/iceberg/base_transaction.h @@ -23,11 +23,9 @@ #include #include "iceberg/table_identifier.h" -#include "iceberg/table_metadata.h" #include "iceberg/table_requirement.h" #include "iceberg/table_update.h" #include "iceberg/transaction.h" -#include "iceberg/transaction_catalog.h" #include "iceberg/type_fwd.h" namespace iceberg { diff --git a/src/iceberg/catalog.h b/src/iceberg/catalog.h index ee7003655..32b763347 100644 --- a/src/iceberg/catalog.h +++ b/src/iceberg/catalog.h @@ -185,6 +185,9 @@ class ICEBERG_EXPORT Catalog { virtual Result> RegisterTable( const TableIdentifier& identifier, const std::string& metadata_file_location) = 0; + /// \brief Set whether the last operation in a transaction has been committed + /// + /// \param committed true if the last operation has been committed, false otherwise virtual void SetLastOperationCommitted(bool committed) = 0; }; diff --git a/src/iceberg/catalog/memory/in_memory_catalog.h b/src/iceberg/catalog/memory/in_memory_catalog.h index ec9a923cf..c15756cda 100644 --- a/src/iceberg/catalog/memory/in_memory_catalog.h +++ b/src/iceberg/catalog/memory/in_memory_catalog.h @@ -22,8 +22,6 @@ #include #include "iceberg/catalog.h" -#include "iceberg/table_requirement.h" -#include "iceberg/table_update.h" namespace iceberg { diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 995fdea68..ac2379020 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -76,7 +76,6 @@ iceberg_sources = files( 'partition_field.cc', 'partition_spec.cc', 'partition_summary.cc', - 'pending_update.cc', 'row/arrow_array_wrapper.cc', 'row/manifest_wrapper.cc', 'row/partition_values.cc', @@ -96,6 +95,7 @@ iceberg_sources = files( 'table_requirements.cc', 'table_scan.cc', 'table_update.cc', + 'transaction_catalog.cc', 'transform.cc', 'transform_function.cc', 'type.cc', diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index ffc33062b..5251ee6bd 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -19,8 +19,6 @@ #include "iceberg/table.h" -#include - #include "iceberg/base_transaction.h" #include "iceberg/catalog.h" #include "iceberg/partition_spec.h" diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index acb5857ac..a90c5b0ca 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -670,17 +670,4 @@ Result> TableMetadataBuilder::Build() { return result; } -std::vector> TableMetadataBuilder::GetChanges() { - return std::move(impl_->changes); -} - -std::vector TableMetadataBuilder::GetChangesView() const { - std::vector result; - result.reserve(impl_->changes.size()); - for (const auto& change : impl_->changes) { - result.push_back(change.get()); - } - return result; -} - } // namespace iceberg diff --git a/src/iceberg/table_metadata.h b/src/iceberg/table_metadata.h index 8470e0327..c00bd5e84 100644 --- a/src/iceberg/table_metadata.h +++ b/src/iceberg/table_metadata.h @@ -421,23 +421,6 @@ class ICEBERG_EXPORT TableMetadataBuilder : public ErrorCollector { /// \brief Destructor ~TableMetadataBuilder() override; - /// \brief Get all recorded TableUpdate changes - /// - /// This method extracts all TableUpdate objects that were automatically - /// recorded when modification methods were called. The changes are moved - /// out of the builder, so this method should only be called once. - /// - /// \return A vector of TableUpdate objects representing all changes - std::vector> GetChanges(); - - /// \brief Get all recorded TableUpdate changes (const version for inspection) - /// - /// This method returns a view of all TableUpdate objects without moving them. - /// Useful for inspection before committing. - /// - /// \return A vector of const pointers to TableUpdate objects - std::vector GetChangesView() const; - // Delete copy operations (use BuildFrom to create a new builder) TableMetadataBuilder(const TableMetadataBuilder&) = delete; TableMetadataBuilder& operator=(const TableMetadataBuilder&) = delete; diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index f462345af..26eed18de 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -41,7 +41,7 @@ class ICEBERG_EXPORT Transaction { /// \brief Create a new update properties operation /// /// \return a new UpdateProperties - virtual std::unique_ptr UpdateProperties() = 0; + virtual std::unique_ptr<::iceberg::UpdateProperties> UpdateProperties() = 0; /// \brief Create a new append API to add files to this table /// diff --git a/src/iceberg/transaction_catalog.h b/src/iceberg/transaction_catalog.h index b14e58293..6008dd719 100644 --- a/src/iceberg/transaction_catalog.h +++ b/src/iceberg/transaction_catalog.h @@ -28,8 +28,6 @@ namespace iceberg { -class BaseTransaction; - /** * @brief Lightweight catalog wrapper for BaseTransaction. * diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index e36fdf298..ed66fd721 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -87,6 +87,7 @@ class UuidType; struct Namespace; struct TableIdentifier; +class BaseTransaction; class Catalog; class FileIO; class LocationProvider; @@ -95,6 +96,7 @@ class SortOrder; class Table; class TableProperties; class Transaction; +class TransactionCatalog; class Transform; class TransformFunction; @@ -154,7 +156,6 @@ class MapLike; class StructLike; class StructLikeAccessor; -class TableUpdate; class AddPartitionSpec; class AddSchema; class AddSnapshot; @@ -172,6 +173,7 @@ class SetDefaultSortOrder; class SetLocation; class SetProperties; class SetSnapshotRef; +class TableUpdate; class UpgradeFormatVersion; class TableRequirement; @@ -179,8 +181,6 @@ class TableMetadataBuilder; class TableUpdateContext; class PendingUpdate; -template -class PendingUpdateTyped; class UpdateProperties; struct PropertiesUpdateChanges; From 601bd970722bb62108e3ca8c7602085f1277e18a Mon Sep 17 00:00:00 2001 From: "shuxu.li" Date: Sun, 7 Dec 2025 09:28:32 +0800 Subject: [PATCH 08/20] feat: transactional UpdateProperties method support --- src/iceberg/catalog/memory/in_memory_catalog.h | 2 ++ src/iceberg/catalog/rest/rest_catalog.h | 2 ++ src/iceberg/test/mock_catalog.h | 1 + 3 files changed, 5 insertions(+) diff --git a/src/iceberg/catalog/memory/in_memory_catalog.h b/src/iceberg/catalog/memory/in_memory_catalog.h index c15756cda..ec9a923cf 100644 --- a/src/iceberg/catalog/memory/in_memory_catalog.h +++ b/src/iceberg/catalog/memory/in_memory_catalog.h @@ -22,6 +22,8 @@ #include #include "iceberg/catalog.h" +#include "iceberg/table_requirement.h" +#include "iceberg/table_update.h" namespace iceberg { diff --git a/src/iceberg/catalog/rest/rest_catalog.h b/src/iceberg/catalog/rest/rest_catalog.h index e3ae95df9..3ef5adea6 100644 --- a/src/iceberg/catalog/rest/rest_catalog.h +++ b/src/iceberg/catalog/rest/rest_catalog.h @@ -26,6 +26,8 @@ #include "iceberg/catalog/rest/iceberg_rest_export.h" #include "iceberg/catalog/rest/type_fwd.h" #include "iceberg/result.h" +#include "iceberg/table_requirement.h" +#include "iceberg/table_update.h" /// \file iceberg/catalog/rest/rest_catalog.h /// RestCatalog implementation for Iceberg REST API. diff --git a/src/iceberg/test/mock_catalog.h b/src/iceberg/test/mock_catalog.h index ca89dd8ed..0509db306 100644 --- a/src/iceberg/test/mock_catalog.h +++ b/src/iceberg/test/mock_catalog.h @@ -23,6 +23,7 @@ #include #include "iceberg/catalog.h" +#include "iceberg/table.h" #include "iceberg/table_requirement.h" #include "iceberg/table_update.h" From 49734a67a932092a1f4bf2dc451a885eab9069c7 Mon Sep 17 00:00:00 2001 From: "shuxu.li" Date: Sun, 7 Dec 2025 10:15:05 +0800 Subject: [PATCH 09/20] feat: transactional UpdateProperties method support --- src/iceberg/base_transaction.h | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/iceberg/base_transaction.h b/src/iceberg/base_transaction.h index 5c2604429..c1c29518f 100644 --- a/src/iceberg/base_transaction.h +++ b/src/iceberg/base_transaction.h @@ -83,6 +83,11 @@ class ICEBERG_EXPORT BaseTransaction : public Transaction { std::shared_ptr metadata) : identifier(identifier), current_metadata(std::move(metadata)) {} + TransactionContext(const TransactionContext&) = delete; + TransactionContext& operator=(const TransactionContext&) = delete; + TransactionContext(TransactionContext&&) noexcept = default; + TransactionContext& operator=(TransactionContext&&) noexcept = default; + bool last_operation_committed = true; TableIdentifier identifier; std::shared_ptr current_metadata; From a88e27bf59f510d4c9d9813ef9f63ce639f7bdf6 Mon Sep 17 00:00:00 2001 From: "shuxu.li" Date: Sun, 7 Dec 2025 11:41:12 +0800 Subject: [PATCH 10/20] feat: transactional UpdateProperties method support --- src/iceberg/base_transaction.h | 4 ++-- src/iceberg/test/base_transaction_test.cc | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/iceberg/base_transaction.h b/src/iceberg/base_transaction.h index c1c29518f..a147666b2 100644 --- a/src/iceberg/base_transaction.h +++ b/src/iceberg/base_transaction.h @@ -79,9 +79,9 @@ class ICEBERG_EXPORT BaseTransaction : public Transaction { private: struct TransactionContext { TransactionContext() = default; - TransactionContext(const TableIdentifier& identifier, + TransactionContext(TableIdentifier identifier, std::shared_ptr metadata) - : identifier(identifier), current_metadata(std::move(metadata)) {} + : identifier(std::move(identifier)), current_metadata(std::move(metadata)) {} TransactionContext(const TransactionContext&) = delete; TransactionContext& operator=(const TransactionContext&) = delete; diff --git a/src/iceberg/test/base_transaction_test.cc b/src/iceberg/test/base_transaction_test.cc index 32d8597f9..bab1adeac 100644 --- a/src/iceberg/test/base_transaction_test.cc +++ b/src/iceberg/test/base_transaction_test.cc @@ -102,7 +102,7 @@ TEST_F(BaseTransactionTest, RemovePropertiesSkipsMissingKeys) { dynamic_cast(updates.front().get()); EXPECT_NE(remove_update, nullptr); EXPECT_THAT(remove_update->removed(), - ::testing::ElementsAre("existing", "missing")); + ::testing::UnorderedElementsAre("missing", "existing")); return {std::unique_ptr
()}; }); From cd00a7b2285b1057aa7b50a6cdd5804a15bce651 Mon Sep 17 00:00:00 2001 From: "shuxu.li" Date: Mon, 8 Dec 2025 10:04:56 +0800 Subject: [PATCH 11/20] feat: transactional UpdateProperties method support --- src/iceberg/base_transaction.h | 49 +++++++++++++++++++++++++------ src/iceberg/transaction_catalog.h | 3 ++ 2 files changed, 43 insertions(+), 9 deletions(-) diff --git a/src/iceberg/base_transaction.h b/src/iceberg/base_transaction.h index a147666b2..e0ddfcec8 100644 --- a/src/iceberg/base_transaction.h +++ b/src/iceberg/base_transaction.h @@ -44,28 +44,40 @@ class ICEBERG_EXPORT BaseTransaction : public Transaction { Status CommitTransaction() override; + /// \brief Stage updates to be applied upon commit + /// + /// \param identifier the table identifier + /// \param requirements the list of table requirements to validate + /// \param updates the list of table updates to apply + /// \return a new Table instance with staged updates applied Result> StageUpdates( const TableIdentifier& identifier, std::vector> requirements, std::vector> updates); + /// \brief Whether the last operation has been committed + /// + /// \return true if the last operation was committed, false otherwise bool HasLastOperationCommitted() const { return context_.last_operation_committed; } + /// \brief Mark the last operation as committed or not + /// + /// \param committed true if the last operation was committed, false otherwise void SetLastOperationCommitted(bool committed) { context_.last_operation_committed = committed; } - const std::shared_ptr& CurrentMetadata() const { - return context_.current_metadata; - } - + protected: + /// \brief Apply a list of table updates to the current metadata + /// + /// \param updates the list of table updates to apply + /// \return Status::OK if the updates were applied successfully, or an error status Status ApplyUpdates(const std::vector>& updates); - std::vector> ConsumePendingRequirements(); - - std::vector> ConsumePendingUpdates(); - - protected: + /// \brief Create a new table update if the last operation has been committed + /// + /// \param args the arguments to forward to the update constructor + /// \return a new UpdateType instance or an error status template Result> CheckAndCreateUpdate(Args&&... args) { if (!HasLastOperationCommitted()) { @@ -76,13 +88,32 @@ class ICEBERG_EXPORT BaseTransaction : public Transaction { return std::make_unique(std::forward(args)...); } + /// \brief Get the current table metadata in the transaction + /// + /// \return the current TableMetadata + const std::shared_ptr& CurrentMetadata() const { + return context_.current_metadata; + } + + /// \brief Move out all pending table requirements and clear the internal queue. + /// + /// \return the list of pending table requirements + std::vector> ConsumePendingRequirements(); + + /// \brief Move out all pending table updates and clear the internal queue. + /// + /// \return the list of pending table updates + std::vector> ConsumePendingUpdates(); + private: + /// \brief Context for transaction struct TransactionContext { TransactionContext() = default; TransactionContext(TableIdentifier identifier, std::shared_ptr metadata) : identifier(std::move(identifier)), current_metadata(std::move(metadata)) {} + // Non-copyable, movable TransactionContext(const TransactionContext&) = delete; TransactionContext& operator=(const TransactionContext&) = delete; TransactionContext(TransactionContext&&) noexcept = default; diff --git a/src/iceberg/transaction_catalog.h b/src/iceberg/transaction_catalog.h index 6008dd719..a6f30c91b 100644 --- a/src/iceberg/transaction_catalog.h +++ b/src/iceberg/transaction_catalog.h @@ -122,6 +122,9 @@ class ICEBERG_EXPORT TransactionCatalog : public Catalog { void SetLastOperationCommitted(bool committed) override; + /// \brief Get the underlying catalog implementation + /// + /// \return the shared pointer to the underlying catalog const std::shared_ptr& catalog_impl() const { return catalog_impl_; } private: From 43c83623adae2b86fcb43458e85e68d1aba25de8 Mon Sep 17 00:00:00 2001 From: "shuxu.li" Date: Mon, 8 Dec 2025 11:38:15 +0800 Subject: [PATCH 12/20] feat: transactional UpdateProperties method support --- src/iceberg/catalog/memory/in_memory_catalog.h | 2 -- src/iceberg/catalog/rest/rest_catalog.h | 2 -- src/iceberg/test/in_memory_catalog_test.cc | 2 ++ src/iceberg/test/mock_catalog.h | 3 --- src/iceberg/test/update_properties_test.cc | 2 ++ 5 files changed, 4 insertions(+), 7 deletions(-) diff --git a/src/iceberg/catalog/memory/in_memory_catalog.h b/src/iceberg/catalog/memory/in_memory_catalog.h index ec9a923cf..c15756cda 100644 --- a/src/iceberg/catalog/memory/in_memory_catalog.h +++ b/src/iceberg/catalog/memory/in_memory_catalog.h @@ -22,8 +22,6 @@ #include #include "iceberg/catalog.h" -#include "iceberg/table_requirement.h" -#include "iceberg/table_update.h" namespace iceberg { diff --git a/src/iceberg/catalog/rest/rest_catalog.h b/src/iceberg/catalog/rest/rest_catalog.h index 3ef5adea6..e3ae95df9 100644 --- a/src/iceberg/catalog/rest/rest_catalog.h +++ b/src/iceberg/catalog/rest/rest_catalog.h @@ -26,8 +26,6 @@ #include "iceberg/catalog/rest/iceberg_rest_export.h" #include "iceberg/catalog/rest/type_fwd.h" #include "iceberg/result.h" -#include "iceberg/table_requirement.h" -#include "iceberg/table_update.h" /// \file iceberg/catalog/rest/rest_catalog.h /// RestCatalog implementation for Iceberg REST API. diff --git a/src/iceberg/test/in_memory_catalog_test.cc b/src/iceberg/test/in_memory_catalog_test.cc index f7e2f50a9..617b4a876 100644 --- a/src/iceberg/test/in_memory_catalog_test.cc +++ b/src/iceberg/test/in_memory_catalog_test.cc @@ -29,6 +29,8 @@ #include "iceberg/schema.h" #include "iceberg/table.h" #include "iceberg/table_metadata.h" +#include "iceberg/table_requirement.h" +#include "iceberg/table_update.h" #include "iceberg/test/matchers.h" #include "iceberg/test/mock_catalog.h" #include "iceberg/test/test_resource.h" diff --git a/src/iceberg/test/mock_catalog.h b/src/iceberg/test/mock_catalog.h index 0509db306..ae876a792 100644 --- a/src/iceberg/test/mock_catalog.h +++ b/src/iceberg/test/mock_catalog.h @@ -23,9 +23,6 @@ #include #include "iceberg/catalog.h" -#include "iceberg/table.h" -#include "iceberg/table_requirement.h" -#include "iceberg/table_update.h" namespace iceberg { diff --git a/src/iceberg/test/update_properties_test.cc b/src/iceberg/test/update_properties_test.cc index 6517be2b3..4f180d8f7 100644 --- a/src/iceberg/test/update_properties_test.cc +++ b/src/iceberg/test/update_properties_test.cc @@ -33,6 +33,8 @@ #include "iceberg/table.h" #include "iceberg/table_identifier.h" #include "iceberg/table_metadata.h" +#include "iceberg/table_requirement.h" +#include "iceberg/table_update.h" #include "iceberg/test/matchers.h" #include "iceberg/test/mock_catalog.h" From b1ac47800f7818f174bb006a82d133cfcac1d445 Mon Sep 17 00:00:00 2001 From: "shuxu.li" Date: Mon, 8 Dec 2025 12:01:30 +0800 Subject: [PATCH 13/20] feat: add update table properties support --- src/iceberg/catalog/memory/in_memory_catalog.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/iceberg/catalog/memory/in_memory_catalog.cc b/src/iceberg/catalog/memory/in_memory_catalog.cc index 645ee43eb..a41a52d4b 100644 --- a/src/iceberg/catalog/memory/in_memory_catalog.cc +++ b/src/iceberg/catalog/memory/in_memory_catalog.cc @@ -24,6 +24,8 @@ #include "iceberg/table.h" #include "iceberg/table_metadata.h" +#include "iceberg/table_requirement.h" +#include "iceberg/table_update.h" #include "iceberg/util/macros.h" namespace iceberg { From 93477bf8bb17cb0dc6f58bd5740a5a37c237e10a Mon Sep 17 00:00:00 2001 From: "shuxu.li" Date: Mon, 8 Dec 2025 13:03:21 +0800 Subject: [PATCH 14/20] feat: transactional UpdateProperties method support --- src/iceberg/catalog/rest/rest_catalog.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/iceberg/catalog/rest/rest_catalog.cc b/src/iceberg/catalog/rest/rest_catalog.cc index 54828479a..62cf3c3aa 100644 --- a/src/iceberg/catalog/rest/rest_catalog.cc +++ b/src/iceberg/catalog/rest/rest_catalog.cc @@ -31,13 +31,14 @@ #include "iceberg/catalog/rest/http_client.h" #include "iceberg/catalog/rest/json_internal.h" #include "iceberg/catalog/rest/resource_paths.h" -#include "iceberg/catalog/rest/rest_catalog.h" #include "iceberg/catalog/rest/rest_util.h" #include "iceberg/json_internal.h" #include "iceberg/partition_spec.h" #include "iceberg/result.h" #include "iceberg/schema.h" #include "iceberg/table.h" +#include "iceberg/table_requirement.h" +#include "iceberg/table_update.h" #include "iceberg/util/macros.h" namespace iceberg::rest { From eace25902fc4e438b5b98b38e1fae970982b696e Mon Sep 17 00:00:00 2001 From: "shuxu.li" Date: Sat, 13 Dec 2025 16:34:44 +0800 Subject: [PATCH 15/20] feat: transactional UpdateProperties method support --- src/iceberg/base_transaction.cc | 61 ++++------ src/iceberg/base_transaction.h | 47 ++----- src/iceberg/table.h | 3 + src/iceberg/test/base_transaction_test.cc | 142 +++++++++++----------- src/iceberg/transaction.h | 4 +- src/iceberg/transaction_catalog.cc | 19 ++- src/iceberg/transaction_catalog.h | 8 +- src/iceberg/update/update_properties.cc | 17 ++- src/iceberg/update/update_properties.h | 2 +- 9 files changed, 144 insertions(+), 159 deletions(-) diff --git a/src/iceberg/base_transaction.cc b/src/iceberg/base_transaction.cc index fd58432bf..bd20d8bdf 100644 --- a/src/iceberg/base_transaction.cc +++ b/src/iceberg/base_transaction.cc @@ -19,8 +19,6 @@ #include "iceberg/base_transaction.h" -#include - #include "iceberg/catalog.h" #include "iceberg/pending_update.h" #include "iceberg/table.h" @@ -43,17 +41,19 @@ BaseTransaction::BaseTransaction(std::shared_ptr table, const std::shared_ptr& BaseTransaction::table() const { return table_; } -std::unique_ptr BaseTransaction::UpdateProperties() { - auto update = CheckAndCreateUpdate<::iceberg::UpdateProperties>( - table_->name(), catalog_, CurrentMetadata()); - if (!update.has_value()) { - ERROR_TO_EXCEPTION(update.error()); +Result> BaseTransaction::NewUpdateProperties() { + if (!HasLastOperationCommitted()) { + return InvalidState( + "Cannot create new update: last operation in transaction has not committed"); } + SetLastOperationCommitted(false); - return std::move(update).value(); + auto metadata = std::make_shared(*context_.current_metadata); + return std::make_unique(table_->name(), catalog_, + std::move(metadata)); } -std::unique_ptr BaseTransaction::NewAppend() { +Result> BaseTransaction::NewAppend() { throw NotImplemented("BaseTransaction::NewAppend not implemented"); } @@ -62,30 +62,25 @@ Status BaseTransaction::CommitTransaction() { return InvalidState("Cannot commit transaction: last operation has not committed"); } - auto pending_updates = ConsumePendingUpdates(); - if (pending_updates.empty()) { + if (context_.pending_updates.empty()) { return {}; } - auto pending_requirements = ConsumePendingRequirements(); - ICEBERG_ASSIGN_OR_RAISE( auto updated_table, catalog_->catalog_impl()->UpdateTable( - table_->name(), std::move(pending_requirements), std::move(pending_updates))); + context_.identifier, context_.pending_requirements, context_.pending_updates)); - // update table to the new version - if (updated_table) { - table_ = std::shared_ptr
(std::move(updated_table)); - } + context_.pending_requirements.clear(); + context_.pending_updates.clear(); return {}; } Result> BaseTransaction::StageUpdates( const TableIdentifier& identifier, - std::vector> requirements, - std::vector> updates) { + const std::vector>& requirements, + const std::vector>& updates) { if (identifier != context_.identifier) { return InvalidArgument("Transaction only supports table '{}'", context_.identifier.name); @@ -98,25 +93,22 @@ Result> BaseTransaction::StageUpdates( if (updates.empty()) { return std::make_unique
( context_.identifier, std::make_shared(*context_.current_metadata), - table_->location(), table_->io(), catalog_->catalog_impl()); + table_->metadata_location(), table_->io(), catalog_->catalog_impl()); } ICEBERG_RETURN_UNEXPECTED(ApplyUpdates(updates)); - - for (auto& requirement : requirements) { - context_.pending_requirements.emplace_back(std::move(requirement)); - } - for (auto& update : updates) { - context_.pending_updates.emplace_back(std::move(update)); - } + context_.pending_requirements.insert(context_.pending_requirements.end(), + requirements.begin(), requirements.end()); + context_.pending_updates.insert(context_.pending_updates.end(), updates.begin(), + updates.end()); return std::make_unique
( context_.identifier, std::make_shared(*context_.current_metadata), - table_->location(), table_->io(), catalog_->catalog_impl()); + table_->metadata_location(), table_->io(), catalog_->catalog_impl()); } Status BaseTransaction::ApplyUpdates( - const std::vector>& updates) { + const std::vector>& updates) { if (updates.empty()) { return {}; } @@ -134,13 +126,4 @@ Status BaseTransaction::ApplyUpdates( return {}; } -std::vector> -BaseTransaction::ConsumePendingRequirements() { - return std::exchange(context_.pending_requirements, {}); -} - -std::vector> BaseTransaction::ConsumePendingUpdates() { - return std::exchange(context_.pending_updates, {}); -} - } // namespace iceberg diff --git a/src/iceberg/base_transaction.h b/src/iceberg/base_transaction.h index e0ddfcec8..d7f148c68 100644 --- a/src/iceberg/base_transaction.h +++ b/src/iceberg/base_transaction.h @@ -38,9 +38,9 @@ class ICEBERG_EXPORT BaseTransaction : public Transaction { const std::shared_ptr& table() const override; - std::unique_ptr<::iceberg::UpdateProperties> UpdateProperties() override; + Result> NewUpdateProperties() override; - std::unique_ptr NewAppend() override; + Result> NewAppend() override; Status CommitTransaction() override; @@ -52,8 +52,8 @@ class ICEBERG_EXPORT BaseTransaction : public Transaction { /// \return a new Table instance with staged updates applied Result> StageUpdates( const TableIdentifier& identifier, - std::vector> requirements, - std::vector> updates); + const std::vector>& requirements, + const std::vector>& updates); /// \brief Whether the last operation has been committed /// @@ -72,38 +72,7 @@ class ICEBERG_EXPORT BaseTransaction : public Transaction { /// /// \param updates the list of table updates to apply /// \return Status::OK if the updates were applied successfully, or an error status - Status ApplyUpdates(const std::vector>& updates); - - /// \brief Create a new table update if the last operation has been committed - /// - /// \param args the arguments to forward to the update constructor - /// \return a new UpdateType instance or an error status - template - Result> CheckAndCreateUpdate(Args&&... args) { - if (!HasLastOperationCommitted()) { - return InvalidState( - "Cannot create new update: last operation in transaction has not committed"); - } - SetLastOperationCommitted(false); - return std::make_unique(std::forward(args)...); - } - - /// \brief Get the current table metadata in the transaction - /// - /// \return the current TableMetadata - const std::shared_ptr& CurrentMetadata() const { - return context_.current_metadata; - } - - /// \brief Move out all pending table requirements and clear the internal queue. - /// - /// \return the list of pending table requirements - std::vector> ConsumePendingRequirements(); - - /// \brief Move out all pending table updates and clear the internal queue. - /// - /// \return the list of pending table updates - std::vector> ConsumePendingUpdates(); + Status ApplyUpdates(const std::vector>& updates); private: /// \brief Context for transaction @@ -122,13 +91,15 @@ class ICEBERG_EXPORT BaseTransaction : public Transaction { bool last_operation_committed = true; TableIdentifier identifier; std::shared_ptr current_metadata; - std::vector> pending_requirements; - std::vector> pending_updates; + std::vector> pending_requirements; + std::vector> pending_updates; }; std::shared_ptr table_; std::shared_ptr catalog_; TransactionContext context_; + + friend class TransactionCatalog; }; } // namespace iceberg diff --git a/src/iceberg/table.h b/src/iceberg/table.h index cabd719d3..210ff19fe 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -87,6 +87,9 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this
{ /// \brief Return the table's base location const std::string& location() const; + /// \brief Return the table's metadata file location + const std::string& metadata_location() const { return metadata_location_; } + /// \brief Return the table's current snapshot, return NotFoundError if not found Result> current_snapshot() const; diff --git a/src/iceberg/test/base_transaction_test.cc b/src/iceberg/test/base_transaction_test.cc index bab1adeac..8e3a5d1dc 100644 --- a/src/iceberg/test/base_transaction_test.cc +++ b/src/iceberg/test/base_transaction_test.cc @@ -19,6 +19,8 @@ #include "iceberg/base_transaction.h" +#include + #include #include "iceberg/partition_spec.h" @@ -31,14 +33,6 @@ #include "iceberg/update/update_properties.h" namespace iceberg { -namespace { -std::shared_ptr
CreateTestTable(const TableIdentifier& identifier, - const std::shared_ptr& metadata, - const std::shared_ptr& catalog) { - return std::make_shared
(identifier, metadata, "s3://bucket/table/metadata.json", - nullptr, catalog); -} -} // namespace class BaseTransactionTest : public ::testing::Test { protected: @@ -60,95 +54,103 @@ class BaseTransactionTest : public ::testing::Test { TEST_F(BaseTransactionTest, CommitSetPropertiesUsesCatalog) { auto transaction = table_->NewTransaction(); - auto update_properties = transaction->UpdateProperties(); - update_properties->Set("new-key", "new-value"); - EXPECT_THAT(update_properties->Commit(), IsOk()); + auto update_properties = transaction->NewUpdateProperties(); + EXPECT_TRUE(update_properties.has_value()); + update_properties.value()->Set("new-key", "new-value"); + EXPECT_THAT(update_properties.value()->Commit(), IsOk()); EXPECT_CALL(*catalog_, UpdateTable(::testing::Eq(identifier_), ::testing::_, ::testing::_)) - .WillOnce([](const TableIdentifier& id, - std::vector> /*requirements*/, - std::vector> updates) - -> Result> { - EXPECT_EQ("test_table", id.name); - EXPECT_EQ(1u, updates.size()); - const auto* set_update = - dynamic_cast(updates.front().get()); - EXPECT_NE(set_update, nullptr); - const auto& updated = set_update->updated(); - auto it = updated.find("new-key"); - EXPECT_NE(it, updated.end()); - EXPECT_EQ("new-value", it->second); - return {std::unique_ptr
()}; - }); + .WillOnce( + [](const TableIdentifier& id, + const std::vector>& /*requirements*/, + const std::vector>& updates) + -> Result> { + EXPECT_EQ("test_table", id.name); + EXPECT_EQ(1u, updates.size()); + const auto* set_update = + dynamic_cast(updates.front().get()); + EXPECT_NE(set_update, nullptr); + const auto& updated = set_update->updated(); + auto it = updated.find("new-key"); + EXPECT_NE(it, updated.end()); + EXPECT_EQ("new-value", it->second); + return {std::unique_ptr
()}; + }); EXPECT_THAT(transaction->CommitTransaction(), IsOk()); } TEST_F(BaseTransactionTest, RemovePropertiesSkipsMissingKeys) { auto transaction = table_->NewTransaction(); - auto update_properties = transaction->UpdateProperties(); - update_properties->Remove("missing").Remove("existing"); - EXPECT_THAT(update_properties->Commit(), IsOk()); + auto update_properties = transaction->NewUpdateProperties(); + EXPECT_TRUE(update_properties.has_value()); + update_properties.value()->Remove("missing").Remove("existing"); + EXPECT_THAT(update_properties.value()->Commit(), IsOk()); EXPECT_CALL(*catalog_, UpdateTable(::testing::Eq(identifier_), ::testing::_, ::testing::_)) - .WillOnce([](const TableIdentifier&, - std::vector> /*requirements*/, - std::vector> updates) - -> Result> { - EXPECT_EQ(1u, updates.size()); - const auto* remove_update = - dynamic_cast(updates.front().get()); - EXPECT_NE(remove_update, nullptr); - EXPECT_THAT(remove_update->removed(), - ::testing::UnorderedElementsAre("missing", "existing")); - return {std::unique_ptr
()}; - }); + .WillOnce( + [](const TableIdentifier&, + const std::vector>& /*requirements*/, + const std::vector>& updates) + -> Result> { + EXPECT_EQ(1u, updates.size()); + const auto* remove_update = + dynamic_cast(updates.front().get()); + EXPECT_NE(remove_update, nullptr); + EXPECT_THAT(remove_update->removed(), + ::testing::UnorderedElementsAre("missing", "existing")); + return {std::unique_ptr
()}; + }); EXPECT_THAT(transaction->CommitTransaction(), IsOk()); } TEST_F(BaseTransactionTest, AggregatesMultiplePendingUpdates) { auto transaction = table_->NewTransaction(); - auto update_properties = transaction->UpdateProperties(); - update_properties->Set("new-key", "new-value"); - EXPECT_THAT(update_properties->Commit(), IsOk()); - auto remove_properties = transaction->UpdateProperties(); - remove_properties->Remove("existing"); - EXPECT_THAT(remove_properties->Commit(), IsOk()); + auto update_properties = transaction->NewUpdateProperties(); + EXPECT_TRUE(update_properties.has_value()); + update_properties.value()->Set("new-key", "new-value"); + EXPECT_THAT(update_properties.value()->Commit(), IsOk()); + auto remove_properties = transaction->NewUpdateProperties(); + EXPECT_TRUE(remove_properties.has_value()); + remove_properties.value()->Remove("existing"); + EXPECT_THAT(remove_properties.value()->Commit(), IsOk()); EXPECT_CALL(*catalog_, UpdateTable(::testing::Eq(identifier_), ::testing::_, ::testing::_)) - .WillOnce([](const TableIdentifier&, - std::vector> /*requirements*/, - std::vector> updates) - -> Result> { - EXPECT_EQ(2u, updates.size()); - - const auto* set_update = - dynamic_cast(updates[0].get()); - EXPECT_NE(set_update, nullptr); - const auto& updated = set_update->updated(); - auto it = updated.find("new-key"); - EXPECT_NE(it, updated.end()); - EXPECT_EQ("new-value", it->second); - - const auto* remove_update = - dynamic_cast(updates[1].get()); - EXPECT_NE(remove_update, nullptr); - EXPECT_THAT(remove_update->removed(), ::testing::ElementsAre("existing")); - - return {std::unique_ptr
()}; - }); + .WillOnce( + [](const TableIdentifier&, + const std::vector>& /*requirements*/, + const std::vector>& updates) + -> Result> { + EXPECT_EQ(2u, updates.size()); + + const auto* set_update = + dynamic_cast(updates[0].get()); + EXPECT_NE(set_update, nullptr); + const auto& updated = set_update->updated(); + auto it = updated.find("new-key"); + EXPECT_NE(it, updated.end()); + EXPECT_EQ("new-value", it->second); + + const auto* remove_update = + dynamic_cast(updates[1].get()); + EXPECT_NE(remove_update, nullptr); + EXPECT_THAT(remove_update->removed(), ::testing::ElementsAre("existing")); + + return {std::unique_ptr
()}; + }); EXPECT_THAT(transaction->CommitTransaction(), IsOk()); } TEST_F(BaseTransactionTest, FailsIfUpdateNotCommitted) { auto transaction = table_->NewTransaction(); - auto update_properties = transaction->UpdateProperties(); - update_properties->Set("new-key", "new-value"); + auto update_properties = transaction->NewUpdateProperties(); + EXPECT_TRUE(update_properties.has_value()); + update_properties.value()->Set("new-key", "new-value"); EXPECT_THAT(transaction->CommitTransaction(), IsError(ErrorKind::kInvalidState)); } diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index 26eed18de..665903f97 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -41,12 +41,12 @@ class ICEBERG_EXPORT Transaction { /// \brief Create a new update properties operation /// /// \return a new UpdateProperties - virtual std::unique_ptr<::iceberg::UpdateProperties> UpdateProperties() = 0; + virtual Result> NewUpdateProperties() = 0; /// \brief Create a new append API to add files to this table /// /// \return a new AppendFiles - virtual std::unique_ptr NewAppend() = 0; + virtual Result> NewAppend() = 0; /// \brief Apply the pending changes from all actions and commit /// diff --git a/src/iceberg/transaction_catalog.cc b/src/iceberg/transaction_catalog.cc index 7f1ebe4dd..ceecfbf05 100644 --- a/src/iceberg/transaction_catalog.cc +++ b/src/iceberg/transaction_catalog.cc @@ -20,18 +20,31 @@ #include "iceberg/transaction_catalog.h" #include "iceberg/base_transaction.h" +#include "iceberg/table_metadata.h" namespace iceberg { Result> TransactionCatalog::UpdateTable( const TableIdentifier& identifier, - std::vector> requirements, - std::vector> updates) { + const std::vector>& requirements, + const std::vector>& updates) { if (!owner_) { return InvalidState("Transaction state is unavailable"); } - return owner_->StageUpdates(identifier, std::move(requirements), std::move(updates)); + return owner_->StageUpdates(identifier, requirements, updates); +} + +Result> TransactionCatalog::LoadTable( + const TableIdentifier& identifier) { + if (!owner_) { + return InvalidState("Transaction state is unavailable"); + } + + auto metadata = std::make_shared(*owner_->context_.current_metadata); + return std::make_unique
(identifier, std::move(metadata), + owner_->table()->metadata_location(), + owner_->table()->io(), catalog_impl_); } void TransactionCatalog::SetLastOperationCommitted(bool committed) { diff --git a/src/iceberg/transaction_catalog.h b/src/iceberg/transaction_catalog.h index a6f30c91b..58cb21684 100644 --- a/src/iceberg/transaction_catalog.h +++ b/src/iceberg/transaction_catalog.h @@ -88,8 +88,8 @@ class ICEBERG_EXPORT TransactionCatalog : public Catalog { Result> UpdateTable( const TableIdentifier& identifier, - std::vector> requirements, - std::vector> updates) override; + const std::vector>& requirements, + const std::vector>& updates) override; Result> StageCreateTable( const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec, @@ -110,9 +110,7 @@ class ICEBERG_EXPORT TransactionCatalog : public Catalog { return NotImplemented("rename table"); } - Result> LoadTable(const TableIdentifier& identifier) override { - return catalog_impl_->LoadTable(identifier); - } + Result> LoadTable(const TableIdentifier& identifier) override; Result> RegisterTable( const TableIdentifier& identifier, diff --git a/src/iceberg/update/update_properties.cc b/src/iceberg/update/update_properties.cc index 76261da42..da56bb980 100644 --- a/src/iceberg/update/update_properties.cc +++ b/src/iceberg/update/update_properties.cc @@ -74,6 +74,10 @@ Status UpdateProperties::Apply() { if (!catalog_) { return InvalidArgument("Catalog is required to apply property updates"); } + + ICEBERG_ASSIGN_OR_RAISE(auto reloaded_table, catalog_->LoadTable(identifier_)); + base_metadata_ = reloaded_table->metadata(); + if (!base_metadata_) { return InvalidArgument("Base table metadata is required to apply property updates"); } @@ -126,8 +130,19 @@ Status UpdateProperties::Commit() { if (!updates.empty()) { ICEBERG_ASSIGN_OR_RAISE(auto requirements, TableRequirements::ForUpdateTable(*base_metadata_, updates)); + auto shared_updates = std::vector>{}; + shared_updates.reserve(updates.size()); + for (auto& update : updates) { + shared_updates.push_back(std::move(update)); + } + auto shared_requirements = std::vector>{}; + shared_requirements.reserve(requirements.size()); + for (auto& requirement : requirements) { + shared_requirements.push_back(std::move(requirement)); + } + ICEBERG_RETURN_UNEXPECTED( - catalog_->UpdateTable(identifier_, std::move(requirements), std::move(updates))); + catalog_->UpdateTable(identifier_, shared_requirements, shared_updates)); } catalog_->SetLastOperationCommitted(true); diff --git a/src/iceberg/update/update_properties.h b/src/iceberg/update/update_properties.h index 0f1adf76a..14eec8ac7 100644 --- a/src/iceberg/update/update_properties.h +++ b/src/iceberg/update/update_properties.h @@ -39,7 +39,7 @@ class ICEBERG_EXPORT UpdateProperties : public PendingUpdate { /// /// \param identifier The table identifier /// \param catalog The catalog containing the table - /// \param metadata The current table metadata + /// \param base The current table metadata UpdateProperties(TableIdentifier identifier, std::shared_ptr catalog, std::shared_ptr base); From 56802ceee498d49e5fad9f015b3ceca134f26887 Mon Sep 17 00:00:00 2001 From: "shuxu.li" Date: Sat, 13 Dec 2025 17:23:27 +0800 Subject: [PATCH 16/20] feat: transactional UpdateProperties method support --- src/iceberg/CMakeLists.txt | 2 +- src/iceberg/base_transaction.cc | 129 ---------------------- src/iceberg/base_transaction.h | 105 ------------------ src/iceberg/meson.build | 2 +- src/iceberg/table.cc | 6 +- src/iceberg/table.h | 5 +- src/iceberg/test/base_transaction_test.cc | 29 ++++- src/iceberg/transaction.h | 95 ++++++++++++++++ src/iceberg/transaction_catalog.cc | 2 +- 9 files changed, 127 insertions(+), 248 deletions(-) delete mode 100644 src/iceberg/base_transaction.cc delete mode 100644 src/iceberg/base_transaction.h diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 5576d9c9f..038b64ff2 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -19,7 +19,6 @@ set(ICEBERG_INCLUDES "$" "$") set(ICEBERG_SOURCES arrow_c_data_guard_internal.cc - base_transaction.cc catalog/memory/in_memory_catalog.cc expression/aggregate.cc expression/binder.cc @@ -73,6 +72,7 @@ set(ICEBERG_SOURCES table_requirements.cc table_scan.cc table_update.cc + transaction.cc transaction_catalog.cc transform.cc transform_function.cc diff --git a/src/iceberg/base_transaction.cc b/src/iceberg/base_transaction.cc deleted file mode 100644 index bd20d8bdf..000000000 --- a/src/iceberg/base_transaction.cc +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "iceberg/base_transaction.h" - -#include "iceberg/catalog.h" -#include "iceberg/pending_update.h" -#include "iceberg/table.h" -#include "iceberg/table_metadata.h" -#include "iceberg/transaction_catalog.h" -#include "iceberg/update/update_properties.h" -#include "iceberg/util/macros.h" - -namespace iceberg { - -BaseTransaction::BaseTransaction(std::shared_ptr table, - std::shared_ptr catalog) - : table_(std::move(table)) { - ICEBERG_DCHECK(table_ != nullptr, "table must not be null"); - ICEBERG_DCHECK(catalog != nullptr, "catalog must not be null"); - context_.identifier = table_->name(); - context_.current_metadata = table_->metadata(); - catalog_ = std::make_shared(std::move(catalog), this); -} - -const std::shared_ptr& BaseTransaction::table() const { return table_; } - -Result> BaseTransaction::NewUpdateProperties() { - if (!HasLastOperationCommitted()) { - return InvalidState( - "Cannot create new update: last operation in transaction has not committed"); - } - SetLastOperationCommitted(false); - - auto metadata = std::make_shared(*context_.current_metadata); - return std::make_unique(table_->name(), catalog_, - std::move(metadata)); -} - -Result> BaseTransaction::NewAppend() { - throw NotImplemented("BaseTransaction::NewAppend not implemented"); -} - -Status BaseTransaction::CommitTransaction() { - if (!HasLastOperationCommitted()) { - return InvalidState("Cannot commit transaction: last operation has not committed"); - } - - if (context_.pending_updates.empty()) { - return {}; - } - - ICEBERG_ASSIGN_OR_RAISE( - auto updated_table, - catalog_->catalog_impl()->UpdateTable( - context_.identifier, context_.pending_requirements, context_.pending_updates)); - - context_.pending_requirements.clear(); - context_.pending_updates.clear(); - - return {}; -} - -Result> BaseTransaction::StageUpdates( - const TableIdentifier& identifier, - const std::vector>& requirements, - const std::vector>& updates) { - if (identifier != context_.identifier) { - return InvalidArgument("Transaction only supports table '{}'", - context_.identifier.name); - } - - if (!context_.current_metadata) { - return InvalidState("Transaction metadata is not initialized"); - } - - if (updates.empty()) { - return std::make_unique
( - context_.identifier, std::make_shared(*context_.current_metadata), - table_->metadata_location(), table_->io(), catalog_->catalog_impl()); - } - - ICEBERG_RETURN_UNEXPECTED(ApplyUpdates(updates)); - context_.pending_requirements.insert(context_.pending_requirements.end(), - requirements.begin(), requirements.end()); - context_.pending_updates.insert(context_.pending_updates.end(), updates.begin(), - updates.end()); - - return std::make_unique
( - context_.identifier, std::make_shared(*context_.current_metadata), - table_->metadata_location(), table_->io(), catalog_->catalog_impl()); -} - -Status BaseTransaction::ApplyUpdates( - const std::vector>& updates) { - if (updates.empty()) { - return {}; - } - - auto builder = TableMetadataBuilder::BuildFrom(context_.current_metadata.get()); - for (const auto& update : updates) { - if (!update) { - continue; - } - update->ApplyTo(*builder); - } - - ICEBERG_ASSIGN_OR_RAISE(auto new_metadata, builder->Build()); - context_.current_metadata = std::shared_ptr(std::move(new_metadata)); - return {}; -} - -} // namespace iceberg diff --git a/src/iceberg/base_transaction.h b/src/iceberg/base_transaction.h deleted file mode 100644 index d7f148c68..000000000 --- a/src/iceberg/base_transaction.h +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#pragma once - -#include -#include - -#include "iceberg/table_identifier.h" -#include "iceberg/table_requirement.h" -#include "iceberg/table_update.h" -#include "iceberg/transaction.h" -#include "iceberg/type_fwd.h" - -namespace iceberg { - -/// \brief Base class for transaction implementations -class ICEBERG_EXPORT BaseTransaction : public Transaction { - public: - BaseTransaction(std::shared_ptr table, std::shared_ptr catalog); - ~BaseTransaction() override = default; - - const std::shared_ptr& table() const override; - - Result> NewUpdateProperties() override; - - Result> NewAppend() override; - - Status CommitTransaction() override; - - /// \brief Stage updates to be applied upon commit - /// - /// \param identifier the table identifier - /// \param requirements the list of table requirements to validate - /// \param updates the list of table updates to apply - /// \return a new Table instance with staged updates applied - Result> StageUpdates( - const TableIdentifier& identifier, - const std::vector>& requirements, - const std::vector>& updates); - - /// \brief Whether the last operation has been committed - /// - /// \return true if the last operation was committed, false otherwise - bool HasLastOperationCommitted() const { return context_.last_operation_committed; } - - /// \brief Mark the last operation as committed or not - /// - /// \param committed true if the last operation was committed, false otherwise - void SetLastOperationCommitted(bool committed) { - context_.last_operation_committed = committed; - } - - protected: - /// \brief Apply a list of table updates to the current metadata - /// - /// \param updates the list of table updates to apply - /// \return Status::OK if the updates were applied successfully, or an error status - Status ApplyUpdates(const std::vector>& updates); - - private: - /// \brief Context for transaction - struct TransactionContext { - TransactionContext() = default; - TransactionContext(TableIdentifier identifier, - std::shared_ptr metadata) - : identifier(std::move(identifier)), current_metadata(std::move(metadata)) {} - - // Non-copyable, movable - TransactionContext(const TransactionContext&) = delete; - TransactionContext& operator=(const TransactionContext&) = delete; - TransactionContext(TransactionContext&&) noexcept = default; - TransactionContext& operator=(TransactionContext&&) noexcept = default; - - bool last_operation_committed = true; - TableIdentifier identifier; - std::shared_ptr current_metadata; - std::vector> pending_requirements; - std::vector> pending_updates; - }; - - std::shared_ptr table_; - std::shared_ptr catalog_; - TransactionContext context_; - - friend class TransactionCatalog; -}; - -} // namespace iceberg diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index ac2379020..7eac3f57f 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -41,7 +41,6 @@ configure_file( iceberg_include_dir = include_directories('..') iceberg_sources = files( 'arrow_c_data_guard_internal.cc', - 'base_transaction.cc', 'catalog/memory/in_memory_catalog.cc', 'expression/aggregate.cc', 'expression/binder.cc', @@ -95,6 +94,7 @@ iceberg_sources = files( 'table_requirements.cc', 'table_scan.cc', 'table_update.cc', + 'transaction.cc', 'transaction_catalog.cc', 'transform.cc', 'transform_function.cc', diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index 5251ee6bd..871394133 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -19,7 +19,6 @@ #include "iceberg/table.h" -#include "iceberg/base_transaction.h" #include "iceberg/catalog.h" #include "iceberg/partition_spec.h" #include "iceberg/schema.h" @@ -27,6 +26,7 @@ #include "iceberg/table_metadata.h" #include "iceberg/table_properties.h" #include "iceberg/table_scan.h" +#include "iceberg/transaction.h" #include "iceberg/update/update_properties.h" #include "iceberg/util/macros.h" @@ -114,8 +114,8 @@ std::unique_ptr Table::UpdateProperties() const { return std::make_unique(identifier_, catalog_, metadata_); } -std::unique_ptr Table::NewTransaction() const { - return std::make_unique(shared_from_this(), catalog_); +Result> Table::NewTransaction() const { + return Transaction::Make(shared_from_this(), catalog_); } const std::shared_ptr& Table::io() const { return io_; } diff --git a/src/iceberg/table.h b/src/iceberg/table.h index 210ff19fe..38483584a 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -26,6 +26,7 @@ #include #include "iceberg/iceberg_export.h" +#include "iceberg/result.h" #include "iceberg/snapshot.h" #include "iceberg/table_identifier.h" #include "iceberg/type_fwd.h" @@ -121,8 +122,8 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this
{ /// \brief Create a new transaction for this table /// - /// \return a pointer to the new Transaction - virtual std::unique_ptr NewTransaction() const; + /// \return a new Transaction or an error if the transaction cannot be created + virtual Result> NewTransaction() const; /// \brief Returns a FileIO to read and write table data and metadata files const std::shared_ptr& io() const; diff --git a/src/iceberg/test/base_transaction_test.cc b/src/iceberg/test/base_transaction_test.cc index 8e3a5d1dc..0c1becb95 100644 --- a/src/iceberg/test/base_transaction_test.cc +++ b/src/iceberg/test/base_transaction_test.cc @@ -17,8 +17,6 @@ * under the License. */ -#include "iceberg/base_transaction.h" - #include #include @@ -30,6 +28,7 @@ #include "iceberg/table_update.h" #include "iceberg/test/matchers.h" #include "iceberg/test/mock_catalog.h" +#include "iceberg/transaction.h" #include "iceberg/update/update_properties.h" namespace iceberg { @@ -47,13 +46,22 @@ class BaseTransactionTest : public ::testing::Test { "s3://bucket/table/metadata.json", nullptr, catalog_); } + std::unique_ptr NewTransaction() { + auto transaction_result = BaseTransaction::Make(table_, catalog_); + if (!transaction_result.has_value()) { + ADD_FAILURE() << "Failed to create transaction: " + << transaction_result.error().message; + } + return std::move(transaction_result).value(); + } + TableIdentifier identifier_; std::shared_ptr catalog_; std::shared_ptr
table_; }; TEST_F(BaseTransactionTest, CommitSetPropertiesUsesCatalog) { - auto transaction = table_->NewTransaction(); + auto transaction = NewTransaction(); auto update_properties = transaction->NewUpdateProperties(); EXPECT_TRUE(update_properties.has_value()); update_properties.value()->Set("new-key", "new-value"); @@ -82,7 +90,7 @@ TEST_F(BaseTransactionTest, CommitSetPropertiesUsesCatalog) { } TEST_F(BaseTransactionTest, RemovePropertiesSkipsMissingKeys) { - auto transaction = table_->NewTransaction(); + auto transaction = NewTransaction(); auto update_properties = transaction->NewUpdateProperties(); EXPECT_TRUE(update_properties.has_value()); update_properties.value()->Remove("missing").Remove("existing"); @@ -108,7 +116,7 @@ TEST_F(BaseTransactionTest, RemovePropertiesSkipsMissingKeys) { } TEST_F(BaseTransactionTest, AggregatesMultiplePendingUpdates) { - auto transaction = table_->NewTransaction(); + auto transaction = NewTransaction(); auto update_properties = transaction->NewUpdateProperties(); EXPECT_TRUE(update_properties.has_value()); update_properties.value()->Set("new-key", "new-value"); @@ -147,11 +155,20 @@ TEST_F(BaseTransactionTest, AggregatesMultiplePendingUpdates) { } TEST_F(BaseTransactionTest, FailsIfUpdateNotCommitted) { - auto transaction = table_->NewTransaction(); + auto transaction = NewTransaction(); auto update_properties = transaction->NewUpdateProperties(); EXPECT_TRUE(update_properties.has_value()); update_properties.value()->Set("new-key", "new-value"); EXPECT_THAT(transaction->CommitTransaction(), IsError(ErrorKind::kInvalidState)); } +TEST_F(BaseTransactionTest, NewTransactionFailsWithoutCatalog) { + auto metadata = std::make_shared(); + auto table_without_catalog = + std::make_shared
(identifier_, std::move(metadata), + "s3://bucket/table/metadata.json", nullptr, nullptr); + EXPECT_THAT(table_without_catalog->NewTransaction(), + IsError(ErrorKind::kInvalidArgument)); +} + } // namespace iceberg diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index 665903f97..6a48d7ff3 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -21,9 +21,13 @@ #pragma once #include +#include #include "iceberg/iceberg_export.h" #include "iceberg/result.h" +#include "iceberg/table_identifier.h" +#include "iceberg/table_requirement.h" +#include "iceberg/table_update.h" #include "iceberg/type_fwd.h" namespace iceberg { @@ -33,6 +37,14 @@ class ICEBERG_EXPORT Transaction { public: virtual ~Transaction() = default; + /// \brief Construct a transaction with validation + /// + /// \param table the table to update + /// \param catalog the catalog backing the table + /// \return the constructed transaction or an error if arguments are invalid + static Result> Make(std::shared_ptr table, + std::shared_ptr catalog); + /// \brief Return the Table that this transaction will update /// /// \return this transaction's table @@ -58,4 +70,87 @@ class ICEBERG_EXPORT Transaction { virtual Status CommitTransaction() = 0; }; +/// \brief Base implementation shared by table transactions +class ICEBERG_EXPORT BaseTransaction : public Transaction { + public: + ~BaseTransaction() override = default; + + /// \brief Construct a BaseTransaction with validation + /// + /// \param table the table to update + /// \param catalog the catalog backing the table + /// \return the constructed transaction or an error if arguments are invalid + static Result> Make(std::shared_ptr table, + std::shared_ptr catalog); + + const std::shared_ptr& table() const override; + + Result> NewUpdateProperties() override; + + Result> NewAppend() override; + + Status CommitTransaction() override; + + /// \brief Stage updates to be applied upon commit + /// + /// \param identifier the table identifier + /// \param requirements the list of table requirements to validate + /// \param updates the list of table updates to apply + /// \return a new Table instance with staged updates applied + Result> StageUpdates( + const TableIdentifier& identifier, + const std::vector>& requirements, + const std::vector>& updates); + + /// \brief Whether the last operation has been committed + /// + /// \return true if the last operation was committed, false otherwise + bool HasLastOperationCommitted() const { return context_.last_operation_committed; } + + /// \brief Mark the last operation as committed or not + /// + /// \param committed true if the last operation was committed, false otherwise + void SetLastOperationCommitted(bool committed) { + context_.last_operation_committed = committed; + } + + protected: + BaseTransaction(std::shared_ptr table, std::shared_ptr catalog); + + /// \brief Apply a list of table updates to the current metadata + /// + /// \param updates the list of table updates to apply + /// \return Status::OK if the updates were applied successfully, or an error status + Status ApplyUpdates(const std::vector>& updates); + + private: + /// \brief Context for transaction + struct TransactionContext { + TransactionContext() = default; + TransactionContext(TableIdentifier identifier, + std::shared_ptr metadata) + : identifier(std::move(identifier)), current_metadata(std::move(metadata)) {} + + // Non-copyable, movable + TransactionContext(const TransactionContext&) = delete; + TransactionContext& operator=(const TransactionContext&) = delete; + TransactionContext(TransactionContext&&) noexcept = default; + TransactionContext& operator=(TransactionContext&&) noexcept = default; + + bool last_operation_committed = true; + TableIdentifier identifier; + std::shared_ptr current_metadata; + std::vector> pending_requirements; + std::vector> pending_updates; + }; + + std::shared_ptr table_; + std::shared_ptr catalog_; + TransactionContext context_; + + friend Result> Transaction::Make( + std::shared_ptr, std::shared_ptr); + friend class TransactionCatalog; +}; + } // namespace iceberg diff --git a/src/iceberg/transaction_catalog.cc b/src/iceberg/transaction_catalog.cc index ceecfbf05..62929793f 100644 --- a/src/iceberg/transaction_catalog.cc +++ b/src/iceberg/transaction_catalog.cc @@ -19,8 +19,8 @@ #include "iceberg/transaction_catalog.h" -#include "iceberg/base_transaction.h" #include "iceberg/table_metadata.h" +#include "iceberg/transaction.h" namespace iceberg { From 5efe0e0d9fd53909e3c4db3a2c1270282538fe32 Mon Sep 17 00:00:00 2001 From: "shuxu.li" Date: Sat, 13 Dec 2025 17:46:47 +0800 Subject: [PATCH 17/20] feat: transactional UpdateProperties method support --- src/iceberg/transaction.cc | 145 +++++++++++++++++++++++++++++++++++++ 1 file changed, 145 insertions(+) create mode 100644 src/iceberg/transaction.cc diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc new file mode 100644 index 000000000..c58cdb367 --- /dev/null +++ b/src/iceberg/transaction.cc @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/transaction.h" + +#include "iceberg/catalog.h" +#include "iceberg/pending_update.h" +#include "iceberg/table.h" +#include "iceberg/table_metadata.h" +#include "iceberg/transaction_catalog.h" +#include "iceberg/update/update_properties.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +Result> Transaction::Make(std::shared_ptr table, + std::shared_ptr catalog) { + return BaseTransaction::Make(std::move(table), std::move(catalog)); +} + +Result> BaseTransaction::Make( + std::shared_ptr table, std::shared_ptr catalog) { + if (!table) { + return InvalidArgument("Transaction::Make requires a table"); + } + if (!catalog) { + return InvalidArgument("Transaction::Make requires a catalog"); + } + + return std::unique_ptr( + new BaseTransaction(std::move(table), std::move(catalog))); +} + +BaseTransaction::BaseTransaction(std::shared_ptr table, + std::shared_ptr catalog) + : table_(std::move(table)) { + context_.identifier = table_->name(); + context_.current_metadata = table_->metadata(); + catalog_ = std::make_shared(std::move(catalog), this); +} + +const std::shared_ptr& BaseTransaction::table() const { return table_; } + +Result> BaseTransaction::NewUpdateProperties() { + if (!HasLastOperationCommitted()) { + return InvalidState( + "Cannot create new update: last operation in transaction has not committed"); + } + SetLastOperationCommitted(false); + + auto metadata = std::make_shared(*context_.current_metadata); + return std::make_unique(table_->name(), catalog_, + std::move(metadata)); +} + +Result> BaseTransaction::NewAppend() { + throw NotImplemented("BaseTransaction::NewAppend not implemented"); +} + +Status BaseTransaction::CommitTransaction() { + if (!HasLastOperationCommitted()) { + return InvalidState("Cannot commit transaction: last operation has not committed"); + } + + if (context_.pending_updates.empty()) { + return {}; + } + + ICEBERG_ASSIGN_OR_RAISE( + auto updated_table, + catalog_->catalog_impl()->UpdateTable( + context_.identifier, context_.pending_requirements, context_.pending_updates)); + + context_.pending_requirements.clear(); + context_.pending_updates.clear(); + + return {}; +} + +Result> BaseTransaction::StageUpdates( + const TableIdentifier& identifier, + const std::vector>& requirements, + const std::vector>& updates) { + if (identifier != context_.identifier) { + return InvalidArgument("Transaction only supports table '{}'", + context_.identifier.name); + } + + if (!context_.current_metadata) { + return InvalidState("Transaction metadata is not initialized"); + } + + if (updates.empty()) { + return std::make_unique
( + context_.identifier, std::make_shared(*context_.current_metadata), + table_->metadata_location(), table_->io(), catalog_->catalog_impl()); + } + + ICEBERG_RETURN_UNEXPECTED(ApplyUpdates(updates)); + context_.pending_requirements.insert(context_.pending_requirements.end(), + requirements.begin(), requirements.end()); + context_.pending_updates.insert(context_.pending_updates.end(), updates.begin(), + updates.end()); + + return std::make_unique
( + context_.identifier, std::make_shared(*context_.current_metadata), + table_->metadata_location(), table_->io(), catalog_->catalog_impl()); +} + +Status BaseTransaction::ApplyUpdates( + const std::vector>& updates) { + if (updates.empty()) { + return {}; + } + + auto builder = TableMetadataBuilder::BuildFrom(context_.current_metadata.get()); + for (const auto& update : updates) { + if (!update) { + continue; + } + update->ApplyTo(*builder); + } + + ICEBERG_ASSIGN_OR_RAISE(auto new_metadata, builder->Build()); + context_.current_metadata = std::shared_ptr(std::move(new_metadata)); + return {}; +} + +} // namespace iceberg From 9579da2262afc117d20cfbc58f0155d9c704607f Mon Sep 17 00:00:00 2001 From: "shuxu.li" Date: Sat, 13 Dec 2025 18:22:08 +0800 Subject: [PATCH 18/20] feat: transactional UpdateProperties method support --- src/iceberg/test/update_properties_test.cc | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/iceberg/test/update_properties_test.cc b/src/iceberg/test/update_properties_test.cc index 4f180d8f7..fd104c3a7 100644 --- a/src/iceberg/test/update_properties_test.cc +++ b/src/iceberg/test/update_properties_test.cc @@ -53,6 +53,13 @@ class UpdatePropertiesTest : public ::testing::Test { // Create catalog and table identifier catalog_ = std::make_shared<::testing::NiceMock>(); + ON_CALL(*catalog_, LoadTable(::testing::_)) + .WillByDefault([this](const TableIdentifier&) -> Result> { + return std::make_unique
(identifier_, metadata_, + "s3://bucket/table/metadata.json", nullptr, + catalog_); + }); + identifier_ = TableIdentifier(Namespace({"test"}), "table"); } @@ -161,8 +168,11 @@ TEST_F(UpdatePropertiesTest, InvalidTable) { { // metadata is null - UpdateProperties update(identifier_, catalog_, nullptr); + auto catalog = std::make_shared<::testing::NiceMock>(); + EXPECT_CALL(*catalog, LoadTable(::testing::_)) + .WillOnce(::testing::Return(InvalidArgument("Base table metadata is required"))); + UpdateProperties update(identifier_, catalog, nullptr); auto result = update.Apply(); EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); EXPECT_THAT(result, HasErrorMessage("Base table metadata is required")); From 51dbf1f0435e2832f85e21d2dea59782b4e02a46 Mon Sep 17 00:00:00 2001 From: "shuxu.li" Date: Sat, 13 Dec 2025 18:30:17 +0800 Subject: [PATCH 19/20] feat: transactional UpdateProperties method support --- src/iceberg/catalog/memory/in_memory_catalog.cc | 2 -- src/iceberg/catalog/rest/rest_catalog.cc | 2 -- src/iceberg/test/in_memory_catalog_test.cc | 2 -- src/iceberg/test/update_properties_test.cc | 2 -- 4 files changed, 8 deletions(-) diff --git a/src/iceberg/catalog/memory/in_memory_catalog.cc b/src/iceberg/catalog/memory/in_memory_catalog.cc index a41a52d4b..645ee43eb 100644 --- a/src/iceberg/catalog/memory/in_memory_catalog.cc +++ b/src/iceberg/catalog/memory/in_memory_catalog.cc @@ -24,8 +24,6 @@ #include "iceberg/table.h" #include "iceberg/table_metadata.h" -#include "iceberg/table_requirement.h" -#include "iceberg/table_update.h" #include "iceberg/util/macros.h" namespace iceberg { diff --git a/src/iceberg/catalog/rest/rest_catalog.cc b/src/iceberg/catalog/rest/rest_catalog.cc index 62cf3c3aa..c1ef11e55 100644 --- a/src/iceberg/catalog/rest/rest_catalog.cc +++ b/src/iceberg/catalog/rest/rest_catalog.cc @@ -37,8 +37,6 @@ #include "iceberg/result.h" #include "iceberg/schema.h" #include "iceberg/table.h" -#include "iceberg/table_requirement.h" -#include "iceberg/table_update.h" #include "iceberg/util/macros.h" namespace iceberg::rest { diff --git a/src/iceberg/test/in_memory_catalog_test.cc b/src/iceberg/test/in_memory_catalog_test.cc index 617b4a876..f7e2f50a9 100644 --- a/src/iceberg/test/in_memory_catalog_test.cc +++ b/src/iceberg/test/in_memory_catalog_test.cc @@ -29,8 +29,6 @@ #include "iceberg/schema.h" #include "iceberg/table.h" #include "iceberg/table_metadata.h" -#include "iceberg/table_requirement.h" -#include "iceberg/table_update.h" #include "iceberg/test/matchers.h" #include "iceberg/test/mock_catalog.h" #include "iceberg/test/test_resource.h" diff --git a/src/iceberg/test/update_properties_test.cc b/src/iceberg/test/update_properties_test.cc index fd104c3a7..4f40d478b 100644 --- a/src/iceberg/test/update_properties_test.cc +++ b/src/iceberg/test/update_properties_test.cc @@ -33,8 +33,6 @@ #include "iceberg/table.h" #include "iceberg/table_identifier.h" #include "iceberg/table_metadata.h" -#include "iceberg/table_requirement.h" -#include "iceberg/table_update.h" #include "iceberg/test/matchers.h" #include "iceberg/test/mock_catalog.h" From df573b4cbe228ee323dcde02eb467fd153e613ef Mon Sep 17 00:00:00 2001 From: "shuxu.li" Date: Sat, 13 Dec 2025 18:56:43 +0800 Subject: [PATCH 20/20] feat: transactional UpdateProperties method support --- src/iceberg/type_fwd.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index ed66fd721..f89378626 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -183,8 +183,6 @@ class TableUpdateContext; class PendingUpdate; class UpdateProperties; -struct PropertiesUpdateChanges; - /// ---------------------------------------------------------------------------- /// TODO: Forward declarations below are not added yet. /// ----------------------------------------------------------------------------