From 6c685018371053c585fa4e1a95b2086349c8a4c0 Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Sun, 14 Dec 2025 18:47:38 +0800 Subject: [PATCH] feat: add snapshot update --- src/iceberg/CMakeLists.txt | 1 + src/iceberg/manifest/manifest_writer.cc | 19 ++ src/iceberg/manifest/manifest_writer.h | 15 + src/iceberg/table.h | 20 +- src/iceberg/table_properties.h | 3 + src/iceberg/transaction.h | 5 +- src/iceberg/type_fwd.h | 2 + src/iceberg/update/meson.build | 2 +- src/iceberg/update/snapshot_update.cc | 340 ++++++++++++++++++++++ src/iceberg/update/snapshot_update.h | 219 ++++++++++++++ src/iceberg/util/snapshot_util.cc | 14 + src/iceberg/util/snapshot_util_internal.h | 9 + src/iceberg/util/uuid.cc | 12 + src/iceberg/util/uuid.h | 3 + 14 files changed, 652 insertions(+), 12 deletions(-) create mode 100644 src/iceberg/update/snapshot_update.cc create mode 100644 src/iceberg/update/snapshot_update.h diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 0579c67d2..0552673fa 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -77,6 +77,7 @@ set(ICEBERG_SOURCES transform_function.cc type.cc update/pending_update.cc + update/snapshot_update.cc update/update_properties.cc util/bucket_util.cc util/conversions.cc diff --git a/src/iceberg/manifest/manifest_writer.cc b/src/iceberg/manifest/manifest_writer.cc index 8cd940d56..f0b890b7c 100644 --- a/src/iceberg/manifest/manifest_writer.cc +++ b/src/iceberg/manifest/manifest_writer.cc @@ -447,4 +447,23 @@ Result> ManifestListWriter::MakeV3Writer( new ManifestListWriter(std::move(writer), std::move(adapter))); } +Result> ManifestListWriter::Make( + int8_t format_version, int64_t snapshot_id, std::optional parent_snapshot_id, + std::string_view manifest_list_location, std::shared_ptr file_io, + int64_t sequence_number, int64_t first_row_id) { + switch (format_version) { + case 1: + return MakeV1Writer(snapshot_id, parent_snapshot_id, manifest_list_location, + file_io); + case 2: + return MakeV2Writer(snapshot_id, parent_snapshot_id, sequence_number, + manifest_list_location, file_io); + case 3: + return MakeV3Writer(snapshot_id, parent_snapshot_id, sequence_number, first_row_id, + manifest_list_location, file_io); + default: + std::unreachable(); + } +} + } // namespace iceberg diff --git a/src/iceberg/manifest/manifest_writer.h b/src/iceberg/manifest/manifest_writer.h index a708e6e3b..14201c704 100644 --- a/src/iceberg/manifest/manifest_writer.h +++ b/src/iceberg/manifest/manifest_writer.h @@ -236,6 +236,21 @@ class ICEBERG_EXPORT ManifestListWriter { int64_t sequence_number, int64_t first_row_id, std::string_view manifest_list_location, std::shared_ptr file_io); + /// \brief Creates a writer for the manifest list based on the format version. + /// \param format_version Format version of the manifest list. + /// \param snapshot_id ID of the snapshot. + /// \param parent_snapshot_id ID of the parent snapshot. + /// \param manifest_list_location Path to the manifest list file. + /// \param file_io File IO implementation to use. + /// \param sequence_number Sequence number of the snapshot. + /// \param first_row_id First row ID of the snapshot. + /// \return A Result containing the writer or an error. + static Result> Make( + int8_t format_version, int64_t snapshot_id, + std::optional parent_snapshot_id, std::string_view manifest_list_location, + std::shared_ptr file_io, int64_t sequence_number = 0, + int64_t first_row_id = 0); + private: // Private constructor for internal use only, use the static Make*Writer methods // instead. diff --git a/src/iceberg/table.h b/src/iceberg/table.h index efe175828..dd99e3e4f 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -50,7 +50,7 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this { virtual ~Table(); - /// \brief Return the identifier of this table + /// \brief Returns the identifier of this table const TableIdentifier& name() const { return identifier_; } /// \brief Returns the UUID of the table @@ -59,40 +59,40 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this
{ /// \brief Return the schema for this table, return NotFoundError if not found Result> schema() const; - /// \brief Return a map of schema for this table + /// \brief Returns a map of schema for this table Result< std::reference_wrapper>>> schemas() const; - /// \brief Return the partition spec for this table, return NotFoundError if not found + /// \brief Returns the partition spec for this table, return NotFoundError if not found Result> spec() const; - /// \brief Return a map of partition specs for this table + /// \brief Returns a map of partition specs for this table Result>>> specs() const; - /// \brief Return the sort order for this table, return NotFoundError if not found + /// \brief Returns the sort order for this table, return NotFoundError if not found Result> sort_order() const; - /// \brief Return a map of sort order IDs to sort orders for this table + /// \brief Returns a map of sort order IDs to sort orders for this table Result>>> sort_orders() const; - /// \brief Return a map of string properties for this table + /// \brief Returns the properties of this table const TableProperties& properties() const; - /// \brief Return the table's metadata file location + /// \brief Returns the table's metadata file location std::string_view metadata_file_location() const; - /// \brief Return the table's base location + /// \brief Returns the table's base location std::string_view location() const; /// \brief Returns the time when this table was last updated TimePointMs last_updated_ms() const; - /// \brief Return the table's current snapshot, return NotFoundError if not found + /// \brief Returns the table's current snapshot, return NotFoundError if not found Result> current_snapshot() const; /// \brief Get the snapshot of this table with the given id diff --git a/src/iceberg/table_properties.h b/src/iceberg/table_properties.h index debe61da2..b953a7ef4 100644 --- a/src/iceberg/table_properties.h +++ b/src/iceberg/table_properties.h @@ -244,6 +244,9 @@ class ICEBERG_EXPORT TableProperties : public ConfigBase { inline static Entry kDeleteTargetFileSizeBytes{ "write.delete.target-file-size-bytes", int64_t{64} * 1024 * 1024}; // 64 MB + inline static Entry kSnapshotIdInheritanceEnabled{ + "compatibility.snapshot-id-inheritance.enabled", false}; + // Garbage collection properties inline static Entry kGcEnabled{"gc.enabled", true}; diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index 36328026b..b1e18dddf 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -68,7 +68,10 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this> updates); - friend class PendingUpdate; // Need to access the Apply method. + /// \brief Friends to access the Apply method. + friend class PendingUpdate; + template + friend class SnapshotUpdate; private: // The table that this transaction will update. diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 133a7043c..8acbdcea5 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -178,6 +178,8 @@ class Transaction; /// \brief Update family. class PendingUpdate; class UpdateProperties; +template +class SnapshotUpdate; /// ---------------------------------------------------------------------------- /// TODO: Forward declarations below are not added yet. diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build index 38502b14e..996229b16 100644 --- a/src/iceberg/update/meson.build +++ b/src/iceberg/update/meson.build @@ -16,6 +16,6 @@ # under the License. install_headers( - ['pending_update.h', 'update_properties.h'], + ['pending_update.h', 'update_properties.h', 'snapshot_update.h'], subdir: 'iceberg/update', ) diff --git a/src/iceberg/update/snapshot_update.cc b/src/iceberg/update/snapshot_update.cc new file mode 100644 index 000000000..4af3ba0cd --- /dev/null +++ b/src/iceberg/update/snapshot_update.cc @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/snapshot_update.h" + +#include +#include +#include + +#include "iceberg/manifest/manifest_writer.h" +#include "iceberg/table.h" +#include "iceberg/table_requirements.h" +#include "iceberg/table_update.h" +#include "iceberg/util/macros.h" +#include "iceberg/util/snapshot_util_internal.h" + +namespace iceberg { + +template +SnapshotUpdate::SnapshotUpdate(std::shared_ptr transaction) + : PendingUpdate(std::move(transaction)) { + // Read target manifest size bytes + target_manifest_size_bytes_ = + transaction_->table()->properties().Get(TableProperties::kManifestTargetSizeBytes); + + // For format version 1, check if snapshot ID inheritance is enabled + if (transaction_->table()->metadata()->format_version == 1) { + can_inherit_snapshot_id_ = transaction_->table()->properties().Get( + TableProperties::kSnapshotIdInheritanceEnabled); + } +} + +template +std::vector SnapshotUpdate::WriteDataManifests( + const std::vector& data_files) { + // TODO(any): Implement + return {}; +} + +template +std::vector SnapshotUpdate::WriteDeleteManifests( + const std::vector& delete_files) { + // TODO(any): Implement + return {}; +} + +template +Result SnapshotUpdate::Apply() { + // Get the latest snapshot for the target branch + std::shared_ptr parent_snapshot; + if (auto ref_it = transaction_->table()->metadata()->refs.find(target_branch_); + ref_it != transaction_->table()->metadata()->refs.end()) { + ICEBERG_ASSIGN_OR_RAISE(parent_snapshot, transaction_->table()->SnapshotById( + ref_it->second->snapshot_id)); + } else { + auto current_snapshot_result = transaction_->table()->current_snapshot(); + + ICEBERG_ASSIGN_OR_RAISE(parent_snapshot, transaction_->table()->current_snapshot()); + } + + // Generate snapshot ID + int64_t new_snapshot_id = SnapshotUtil::GenerateSnapshotId(*transaction_->table()); + std::optional parent_snapshot_id = + parent_snapshot ? std::make_optional(parent_snapshot->snapshot_id) : std::nullopt; + + // Get sequence number + int64_t sequence_number = transaction_->table()->metadata()->last_sequence_number + 1; + + // Write manifest list + std::string manifest_list_path = ManifestListPath(); + manifest_lists_.push_back(manifest_list_path); + + ICEBERG_ASSIGN_OR_RAISE( + auto writer, + ManifestListWriter::Make(transaction_->table()->metadata()->format_version, + new_snapshot_id, parent_snapshot_id, manifest_list_path, + transaction_->table()->io(), sequence_number, + transaction_->table()->metadata()->next_row_id)); + + ICEBERG_RETURN_UNEXPECTED( + writer->AddAll(Apply(transaction_->table()->metadata(), parent_snapshot))); + ICEBERG_RETURN_UNEXPECTED(writer->Close()); + + // Compute summary + std::unordered_map summary = + ComputeSummary(transaction_->table()->metadata()); + + // Get current time + auto now = std::chrono::system_clock::now(); + auto duration_since_epoch = now.time_since_epoch(); + TimePointMs timestamp_ms = std::chrono::time_point_cast( + std::chrono::system_clock::time_point(duration_since_epoch)); + + // Get schema ID + std::optional schema_id = transaction_->table()->metadata()->current_schema_id; + + // Create snapshot + staged_snapshot_ = + std::make_shared(Snapshot{.snapshot_id = new_snapshot_id, + .parent_snapshot_id = parent_snapshot_id, + .sequence_number = sequence_number, + .timestamp_ms = timestamp_ms, + .manifest_list = manifest_list_path, + .summary = std::move(summary), + .schema_id = schema_id}); + + // Build metadata update + auto builder = TableMetadataBuilder::BuildFrom(transaction_->table()->metadata().get()); + + // Check if this is a rollback (snapshot already exists) + auto existing_snapshot_result = + transaction_->table()->metadata()->SnapshotById(staged_snapshot_->snapshot_id); + if (existing_snapshot_result.has_value()) { + // Rollback operation + builder->SetBranchSnapshot(staged_snapshot_->snapshot_id, target_branch_); + } else if (stage_only_) { + // Stage only - add snapshot but don't set as current + builder->AddSnapshot(staged_snapshot_); + } else { + // Normal commit - set as branch snapshot + builder->SetBranchSnapshot(staged_snapshot_->snapshot_id, target_branch_); + } + + // Build updated metadata + ICEBERG_ASSIGN_OR_RAISE(auto updated_metadata, builder->Build()); + + // Check if metadata has changed + if (*updated_metadata == *transaction_->table()->metadata()) { + // No changes, commit successful + return ApplyResult{}; + } + + // Ensure UUID is set + if (updated_metadata->table_uuid.empty()) { + builder->AssignUUID(); + ICEBERG_ASSIGN_OR_RAISE(updated_metadata, builder->Build()); + } + + // Create table updates + std::vector> updates; + updates.push_back(std::make_unique(staged_snapshot_)); + if (!stage_only_) { + // Set branch snapshot using SetSnapshotRef + updates.push_back(std::make_unique( + target_branch_, staged_snapshot_->snapshot_id, SnapshotRefType::kBranch)); + } + + // Create requirements + auto requirements_result = + TableRequirements::ForUpdateTable(*transaction_->table()->metadata(), updates); + ICEBERG_ASSIGN_OR_RAISE(auto requirements, std::move(requirements_result)); + + return ApplyResult{std::move(updates)}; +} + +template +Status SnapshotUpdate::Commit() { + ICEBERG_ASSIGN_OR_RAISE(auto apply_result, Apply()); + + auto status = transaction_->Apply(std::move(apply_result.updates)); + + // Cleanup after successful commit + if (status.has_value() && staged_snapshot_ && cleanup_after_commit()) { + CleanUncommitted(committed_manifest_paths_); + // Clean up unused manifest lists + for (const auto& manifest_list : manifest_lists_) { + if (manifest_list != staged_snapshot_->manifest_list) { + DeleteFile(manifest_list); + } + } + } + + return status; +} + +template +void SnapshotUpdate::SetTargetBranch(const std::string& branch) { + if (branch.empty()) { + AddError(ErrorKind::kInvalidArgument, "Invalid branch name: empty"); + return; + } + + auto ref_it = transaction_->table()->metadata()->refs.find(branch); + if (ref_it != transaction_->table()->metadata()->refs.end()) { + if (ref_it->second->type() != SnapshotRefType::kBranch) { + AddError( + ErrorKind::kInvalidArgument, + "{} is a tag, not a branch. Tags cannot be targets for producing snapshots", + branch); + return; + } + } + + target_branch_ = branch; +} + +template +std::unordered_map SnapshotUpdate::ComputeSummary( + const std::shared_ptr& previous) { + std::unordered_map summary = Summary(); + + auto op = operation(); + if (!op.empty()) { + summary[SnapshotSummaryFields::kOperation] = op; + } + + // Get previous summary + std::unordered_map previous_summary; + if (auto ref_it = previous->refs.find(target_branch_); ref_it != previous->refs.end()) { + auto snapshot_result = previous->SnapshotById(ref_it->second->snapshot_id); + if (snapshot_result.has_value() && (*snapshot_result)->summary.size() > 0) { + previous_summary = (*snapshot_result)->summary; + } + } + + // If no previous summary, initialize with zeros + if (previous_summary.empty()) { + previous_summary[SnapshotSummaryFields::kTotalRecords] = "0"; + previous_summary[SnapshotSummaryFields::kTotalFileSize] = "0"; + previous_summary[SnapshotSummaryFields::kTotalDataFiles] = "0"; + previous_summary[SnapshotSummaryFields::kTotalDeleteFiles] = "0"; + previous_summary[SnapshotSummaryFields::kTotalPosDeletes] = "0"; + previous_summary[SnapshotSummaryFields::kTotalEqDeletes] = "0"; + } + + // Copy all summary properties from the implementation + for (const auto& [key, value] : summary_properties_) { + summary[key] = value; + } + + // Update totals + UpdateTotal(summary, previous_summary, SnapshotSummaryFields::kTotalRecords, + SnapshotSummaryFields::kAddedRecords, + SnapshotSummaryFields::kDeletedRecords); + UpdateTotal(summary, previous_summary, SnapshotSummaryFields::kTotalFileSize, + SnapshotSummaryFields::kAddedFileSize, + SnapshotSummaryFields::kRemovedFileSize); + UpdateTotal(summary, previous_summary, SnapshotSummaryFields::kTotalDataFiles, + SnapshotSummaryFields::kAddedDataFiles, + SnapshotSummaryFields::kDeletedDataFiles); + UpdateTotal(summary, previous_summary, SnapshotSummaryFields::kTotalDeleteFiles, + SnapshotSummaryFields::kAddedDeleteFiles, + SnapshotSummaryFields::kRemovedDeleteFiles); + UpdateTotal(summary, previous_summary, SnapshotSummaryFields::kTotalPosDeletes, + SnapshotSummaryFields::kAddedPosDeletes, + SnapshotSummaryFields::kRemovedPosDeletes); + UpdateTotal(summary, previous_summary, SnapshotSummaryFields::kTotalEqDeletes, + SnapshotSummaryFields::kAddedEqDeletes, + SnapshotSummaryFields::kRemovedEqDeletes); + + return summary; +} + +template +void SnapshotUpdate::CleanAll() { + for (const auto& manifest_list : manifest_lists_) { + DeleteFile(manifest_list); + } + manifest_lists_.clear(); + CleanUncommitted(committed_manifest_paths_); + committed_manifest_paths_.clear(); +} + +template +Status SnapshotUpdate::DeleteFile(const std::string& path) { + return delete_func_(path); +} + +template +std::string SnapshotUpdate::ManifestListPath() { + // Generate manifest list path + // Format: {metadata_location}/snap-{snapshot_id}-{attempt}-{uuid}.avro + std::string filename = std::format( + "snap-{}-{}-{}.avro", SnapshotUtil::GenerateSnapshotId(*transaction_->table()), + attempt_.fetch_add(1) + 1, commit_uuid_); + return std::format("{}/metadata/{}", transaction_->table()->location(), filename); +} + +template +void SnapshotUpdate::UpdateTotal( + std::unordered_map& summary, + const std::unordered_map& previous_summary, + const std::string& total_property, const std::string& added_property, + const std::string& deleted_property) { + auto total_it = previous_summary.find(total_property); + if (total_it != previous_summary.end()) { + int64_t new_total; + auto [_, ec] = + std::from_chars(total_it->second.data(), + total_it->second.data() + total_it->second.size(), new_total); + if (ec != std::errc()) [[unlikely]] { + // Ignore and do not add total + return; + } + + auto added_it = summary.find(added_property); + if (new_total >= 0 && added_it != summary.end()) { + int64_t added_value; + auto [_, ec] = + std::from_chars(added_it->second.data(), + added_it->second.data() + added_it->second.size(), added_value); + if (ec == std::errc()) [[unlikely]] { + new_total += added_value; + } + } + + auto deleted_it = summary.find(deleted_property); + if (new_total >= 0 && deleted_it != summary.end()) { + int64_t deleted_value; + auto [_, ec] = std::from_chars( + deleted_it->second.data(), + deleted_it->second.data() + deleted_it->second.size(), deleted_value); + if (ec == std::errc()) [[unlikely]] { + new_total -= deleted_value; + } + } + + if (new_total >= 0) { + summary[total_property] = std::to_string(new_total); + } + } +} + +} // namespace iceberg diff --git a/src/iceberg/update/snapshot_update.h b/src/iceberg/update/snapshot_update.h new file mode 100644 index 000000000..db1f602a4 --- /dev/null +++ b/src/iceberg/update/snapshot_update.h @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/catalog.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/manifest/manifest_list.h" +#include "iceberg/result.h" +#include "iceberg/snapshot.h" +#include "iceberg/table_metadata.h" +#include "iceberg/transaction.h" +#include "iceberg/type_fwd.h" +#include "iceberg/update/pending_update.h" + +namespace iceberg { + +/// \brief Base class for operations that produce snapshots. +/// +/// This class provides common functionality for creating new snapshots, +/// including manifest list writing, commit retries, and cleanup. +/// +/// \tparam T The concrete subclass type +template +class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { + public: + ~SnapshotUpdate() override = default; + + /// \brief Set a summary property in the snapshot produced by this update. + /// + /// \param property A string property name + /// \param value A string property value + /// \return Reference to this for method chaining + T& Set(const std::string& property, const std::string& value) { + summary_properties_[property] = value; + return static_cast(*this); + } + + /// \brief Set a callback to delete files instead of the table's default. + /// + /// \param delete_func A function used to delete file locations + /// \return Reference to this for method chaining + T& DeleteWith(std::function delete_func) { + delete_func_ = std::move(delete_func); + return static_cast(*this); + } + + /// \brief Stage a snapshot in table metadata, but not update the current snapshot id. + /// + /// \return Reference to this for method chaining + T& StageOnly() { + stage_only_ = true; + return static_cast(*this); + } + + /// \brief Apply the update's changes to create a new snapshot. + /// + /// This method validates the changes, applies them to the metadata, + /// and creates a new snapshot without committing it. The snapshot + /// is stored internally and can be accessed after Apply() succeeds. + /// + /// \return A result containing the apply result, or an error + Result Apply() override; + + /// \brief Commits the snapshot changes to the table. + /// + /// This method applies the changes and commits them through the catalog. + /// It handles retries and cleanup of uncommitted files. + /// + /// \return Status::OK if the commit was successful, or an error + Status Commit() override; + + protected: + explicit SnapshotUpdate(std::shared_ptr transaction); + + /// \brief Write data manifests for the given data files + /// + /// \param data_files The data files to write + /// \return A vector of manifest files + std::vector WriteDataManifests(const std::vector& data_files); + + /// \brief Write delete manifests for the given delete files + /// + /// \param delete_files The delete files to write + /// \return A vector of manifest files + std::vector WriteDeleteManifests( + const std::vector& delete_files); + + /// \brief Get the target branch name + const std::string& target_branch() const { return target_branch_; } + + /// \brief Set the target branch name + void SetTargetBranch(const std::string& branch); + + /// \brief Check if snapshot ID inheritance is enabled + bool can_inherit_snapshot_id() const { return can_inherit_snapshot_id_; } + + /// \brief Get the target manifest size in bytes + int64_t target_manifest_size_bytes() const { return target_manifest_size_bytes_; } + + /// \brief Get the commit UUID + const std::string& commit_uuid() const { return commit_uuid_; } + + /// \brief Get the attempt number + int32_t attempt() const { return attempt_.load(); } + + /// \brief Get the manifest count + int32_t manifest_count() const { return manifest_count_.load(); } + + /// \brief Clean up any uncommitted manifests that were created. + /// + /// Manifests may not be committed if apply is called multiple times + /// because a commit conflict has occurred. Implementations may keep + /// around manifests because the same changes will be made by both + /// apply calls. This method instructs the implementation to clean up + /// those manifests and passes the paths of the manifests that were + /// actually committed. + /// + /// \param committed A set of manifest paths that were actually committed + virtual void CleanUncommitted(const std::unordered_set& committed) = 0; + + /// \brief A string that describes the action that produced the new snapshot. + /// + /// \return A string operation name + virtual std::string operation() = 0; + + /// \brief Validate the current metadata. + /// + /// Child operations can override this to add custom validation. + /// + /// \param current_metadata Current table metadata to validate + /// \param snapshot Ending snapshot on the lineage which is being validated + virtual Status Validate(const TableMetadata& current_metadata, + const std::shared_ptr& snapshot) = 0; + + /// \brief Apply the update's changes to the given metadata and snapshot. + /// Return the new manifest list. + /// + /// \param metadata_to_update The base table metadata to apply changes to + /// \param snapshot Snapshot to apply the changes to + /// \return A vector of manifest files for the new snapshot + virtual std::vector Apply( + const std::shared_ptr& metadata_to_update, + const std::shared_ptr& snapshot) = 0; + + /// \brief Get the summary map for this operation. + /// + /// \return A map of summary properties + virtual std::unordered_map Summary() = 0; + + /// \brief Check if cleanup should happen after commit + /// + /// \return True if cleanup should happen after commit + virtual bool cleanup_after_commit() const { return true; } + + private: + /// \brief Compute the final summary including totals + std::unordered_map ComputeSummary( + const std::shared_ptr& previous); + + /// \brief Clean up all uncommitted files + void CleanAll(); + + /// \brief Delete a file using the configured delete function + Status DeleteFile(const std::string& path); + + /// \brief Get the path for a manifest list file + std::string ManifestListPath(); + + /// \brief Update a total property in the summary + void UpdateTotal(std::unordered_map& summary, + const std::unordered_map& previous_summary, + const std::string& total_property, const std::string& added_property, + const std::string& deleted_property); + + std::unordered_map summary_properties_; + std::function delete_func_; + bool stage_only_ = false; + std::string target_branch_ = std::string(SnapshotRef::kMainBranch); + + std::optional snapshot_id_{std::nullopt}; + std::atomic attempt_{0}; + std::atomic manifest_count_{0}; + std::unordered_set committed_manifest_paths_; + std::vector manifest_lists_; + std::string commit_uuid_; + std::shared_ptr staged_snapshot_; + + int64_t target_manifest_size_bytes_; + // For format version > 1, inheritance is enabled by default + bool can_inherit_snapshot_id_{true}; +}; + +} // namespace iceberg diff --git a/src/iceberg/util/snapshot_util.cc b/src/iceberg/util/snapshot_util.cc index 1243a1093..fab5c11a0 100644 --- a/src/iceberg/util/snapshot_util.cc +++ b/src/iceberg/util/snapshot_util.cc @@ -26,6 +26,7 @@ #include "iceberg/util/macros.h" #include "iceberg/util/snapshot_util_internal.h" #include "iceberg/util/timepoint.h" +#include "iceberg/util/uuid.h" namespace iceberg { @@ -320,4 +321,17 @@ Result> SnapshotUtil::LatestSnapshot( return metadata.SnapshotById(it->second->snapshot_id); } +int64_t SnapshotUtil::GenerateSnapshotId() { + auto uuid = Uuid::GenerateV7(); + return (uuid.highbits() ^ uuid.lowbits()) & std::numeric_limits::max(); +} + +int64_t SnapshotUtil::GenerateSnapshotId(const Table& table) { + auto snapshot_id = GenerateSnapshotId(); + while (table.SnapshotById(snapshot_id).has_value()) { + snapshot_id = GenerateSnapshotId(); + } + return snapshot_id; +} + } // namespace iceberg diff --git a/src/iceberg/util/snapshot_util_internal.h b/src/iceberg/util/snapshot_util_internal.h index e0d8830ff..53b4ba967 100644 --- a/src/iceberg/util/snapshot_util_internal.h +++ b/src/iceberg/util/snapshot_util_internal.h @@ -247,6 +247,15 @@ class ICEBERG_EXPORT SnapshotUtil { static Result> LatestSnapshot(const TableMetadata& metadata, const std::string& branch); + /// \brief Generate a new snapshot ID. + static int64_t GenerateSnapshotId(); + + /// \brief Generate a new snapshot ID for the given table. + /// + /// \param table The table + /// \return A new snapshot ID + static int64_t GenerateSnapshotId(const Table& table); + private: /// \brief Helper function to traverse ancestors of a snapshot. /// diff --git a/src/iceberg/util/uuid.cc b/src/iceberg/util/uuid.cc index 9322deb93..8fea859c2 100644 --- a/src/iceberg/util/uuid.cc +++ b/src/iceberg/util/uuid.cc @@ -217,4 +217,16 @@ std::string Uuid::ToString() const { data_[15]); } +int64_t Uuid::highbits() const { + int64_t result; + std::memcpy(&result, data_.data(), 8); + return result; +} + +int64_t Uuid::lowbits() const { + int64_t result; + std::memcpy(&result, data_.data() + 8, 8); + return result; +} + } // namespace iceberg diff --git a/src/iceberg/util/uuid.h b/src/iceberg/util/uuid.h index 64db7c5d6..923736ecc 100644 --- a/src/iceberg/util/uuid.h +++ b/src/iceberg/util/uuid.h @@ -78,6 +78,9 @@ class ICEBERG_EXPORT Uuid : public util::Formattable { return lhs.data_ == rhs.data_; } + int64_t highbits() const; + int64_t lowbits() const; + private: std::array data_; };