Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ set(ICEBERG_SOURCES
transform_function.cc
type.cc
update/pending_update.cc
update/update_sort_order.cc
update/update_partition_spec.cc
update/update_properties.cc
update/update_sort_order.cc
util/bucket_util.cc
util/conversions.cc
util/decimal.cc
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ iceberg_sources = files(
'transform_function.cc',
'type.cc',
'update/pending_update.cc',
'update/update_partition_spec.cc',
'update/update_properties.cc',
'update/update_sort_order.cc',
'util/bucket_util.cc',
Expand Down
5 changes: 5 additions & 0 deletions src/iceberg/partition_spec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ Result<std::unique_ptr<StructType>> PartitionSpec::PartitionType(
return std::make_unique<StructType>(std::move(partition_fields));
}

bool PartitionSpec::SameSpec(const PartitionSpec& other) const {
return fields_ == other.fields_ &&
last_assigned_field_id_ == other.last_assigned_field_id_;
}

std::string PartitionSpec::ToString() const {
std::string repr = std::format("partition_spec[spec_id<{}>,\n", spec_id_);
for (const auto& field : fields_) {
Expand Down
4 changes: 4 additions & 0 deletions src/iceberg/partition_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
/// \brief Get the partition type binding to the input schema.
Result<std::unique_ptr<StructType>> PartitionType(const Schema& schema) const;

/// \brief Checks whether this partition spec is equivalent to another partition spec
/// while ignoring the spec id.
bool SameSpec(const PartitionSpec& other) const;

std::string ToString() const override;

int32_t last_assigned_field_id() const { return last_assigned_field_id_; }
Expand Down
10 changes: 10 additions & 0 deletions src/iceberg/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

#include "iceberg/table.h"

#include <memory>

#include "iceberg/catalog.h"
#include "iceberg/partition_spec.h"
#include "iceberg/result.h"
Expand All @@ -28,6 +30,7 @@
#include "iceberg/table_properties.h"
#include "iceberg/table_scan.h"
#include "iceberg/transaction.h"
#include "iceberg/update/update_partition_spec.h"
#include "iceberg/update/update_properties.h"
#include "iceberg/util/macros.h"

Expand Down Expand Up @@ -147,6 +150,13 @@ Result<std::shared_ptr<Transaction>> Table::NewTransaction() {
/*auto_commit=*/false);
}

Result<std::shared_ptr<UpdatePartitionSpec>> Table::NewUpdatePartitionSpec() {
ICEBERG_ASSIGN_OR_RAISE(
auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,
/*auto_commit=*/true));
return transaction->NewUpdatePartitionSpec();
}

Result<std::shared_ptr<UpdateProperties>> Table::NewUpdateProperties() {
ICEBERG_ASSIGN_OR_RAISE(
auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,
Expand Down
4 changes: 4 additions & 0 deletions src/iceberg/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this<Table> {
/// \brief Create a new Transaction to commit multiple table operations at once.
virtual Result<std::shared_ptr<Transaction>> NewTransaction();

/// \brief Create a new UpdatePartitionSpec to update the partition spec of this table
/// and commit the changes.
virtual Result<std::shared_ptr<UpdatePartitionSpec>> NewUpdatePartitionSpec();

/// \brief Create a new UpdateProperties to update table properties and commit the
/// changes.
virtual Result<std::shared_ptr<UpdateProperties>> NewUpdateProperties();
Expand Down
84 changes: 80 additions & 4 deletions src/iceberg/table_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,8 @@ class TableMetadataBuilder::Impl {
Result<int32_t> AddSortOrder(const SortOrder& order);
Status SetProperties(const std::unordered_map<std::string, std::string>& updated);
Status RemoveProperties(const std::unordered_set<std::string>& removed);

Status SetDefaultPartitionSpec(int32_t spec_id);
Result<int32_t> AddPartitionSpec(const PartitionSpec& spec);
std::unique_ptr<TableMetadata> Build();

private:
Expand All @@ -438,6 +439,12 @@ class TableMetadataBuilder::Impl {
/// \return The ID to use for this sort order (reused if exists, new otherwise)
int32_t ReuseOrCreateNewSortOrderId(const SortOrder& new_order);

/// \brief Internal method to check for existing partition spec and reuse its ID or
/// create a new one
/// \param new_spec The partition spec to check
/// \return The ID to use for this partition spec (reused if exists, new otherwise)
int32_t ReuseOrCreateNewPartitionSpecId(const PartitionSpec& new_spec);

private:
// Base metadata (nullptr for new tables)
const TableMetadata* base_;
Expand Down Expand Up @@ -572,6 +579,58 @@ Result<int32_t> TableMetadataBuilder::Impl::AddSortOrder(const SortOrder& order)
return new_order_id;
}

Status TableMetadataBuilder::Impl::SetDefaultPartitionSpec(int32_t spec_id) {
if (spec_id == -1) {
if (!last_added_spec_id_.has_value()) {
return InvalidArgument(
"Cannot set last added partition spec: no partition spec has been added");
}
return SetDefaultPartitionSpec(last_added_spec_id_.value());
}

if (spec_id == metadata_.default_spec_id) {
return {};
}

metadata_.default_spec_id = spec_id;

changes_.push_back(std::make_unique<table::SetDefaultPartitionSpec>(spec_id));
return {};
}

Result<int32_t> TableMetadataBuilder::Impl::AddPartitionSpec(const PartitionSpec& spec) {
int32_t new_spec_id = ReuseOrCreateNewPartitionSpecId(spec);

if (specs_by_id_.find(new_spec_id) != specs_by_id_.end()) {
// update last_added_spec_id if the spec was added in this set of changes (since it
// is now the last)
bool is_new_spec = last_added_spec_id_.has_value() &&
std::ranges::find_if(changes_, [new_spec_id](const auto& change) {
auto* add_spec =
dynamic_cast<table::AddPartitionSpec*>(change.get());
return add_spec && add_spec->spec()->spec_id() == new_spec_id;
}) != changes_.cend();
last_added_spec_id_ = is_new_spec ? std::make_optional(new_spec_id) : std::nullopt;
return new_spec_id;
}

// Get current schema and validate the partition spec against it
ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata_.Schema());
ICEBERG_RETURN_UNEXPECTED(spec.Validate(*schema, /*allow_missing_fields=*/false));

std::shared_ptr<PartitionSpec> new_spec;
ICEBERG_ASSIGN_OR_RAISE(
new_spec,
PartitionSpec::Make(new_spec_id, std::vector<PartitionField>(spec.fields().begin(),
spec.fields().end())));
metadata_.partition_specs.push_back(new_spec);
specs_by_id_.emplace(new_spec_id, new_spec);

changes_.push_back(std::make_unique<table::AddPartitionSpec>(new_spec));
last_added_spec_id_ = new_spec_id;
return new_spec_id;
}

Status TableMetadataBuilder::Impl::SetProperties(
const std::unordered_map<std::string, std::string>& updated) {
// If updated is empty, return early (no-op)
Expand Down Expand Up @@ -653,6 +712,20 @@ int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewSortOrderId(
return new_order_id;
}

int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewPartitionSpecId(
const PartitionSpec& new_spec) {
// determine the next spec id
int32_t new_spec_id = PartitionSpec::kInitialSpecId;
for (const auto& spec : metadata_.partition_specs) {
if (spec->SameSpec(new_spec)) {
return spec->spec_id();
} else if (new_spec_id <= spec->spec_id()) {
new_spec_id = spec->spec_id() + 1;
}
}
return new_spec_id;
}

TableMetadataBuilder::TableMetadataBuilder(int8_t format_version)
: impl_(std::make_unique<Impl>(format_version)) {}

Expand Down Expand Up @@ -723,16 +796,19 @@ TableMetadataBuilder& TableMetadataBuilder::AddSchema(std::shared_ptr<Schema> sc

TableMetadataBuilder& TableMetadataBuilder::SetDefaultPartitionSpec(
std::shared_ptr<PartitionSpec> spec) {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto spec_id, impl_->AddPartitionSpec(*spec));
return SetDefaultPartitionSpec(spec_id);
}

TableMetadataBuilder& TableMetadataBuilder::SetDefaultPartitionSpec(int32_t spec_id) {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->SetDefaultPartitionSpec(spec_id));
return *this;
}

TableMetadataBuilder& TableMetadataBuilder::AddPartitionSpec(
std::shared_ptr<PartitionSpec> spec) {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto spec_id, impl_->AddPartitionSpec(*spec));
return *this;
}

TableMetadataBuilder& TableMetadataBuilder::RemovePartitionSpecs(
Expand Down
4 changes: 2 additions & 2 deletions src/iceberg/table_update.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ void SetCurrentSchema::GenerateRequirements(TableUpdateContext& context) const {
// AddPartitionSpec

void AddPartitionSpec::ApplyTo(TableMetadataBuilder& builder) const {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
builder.AddPartitionSpec(spec_);
}

void AddPartitionSpec::GenerateRequirements(TableUpdateContext& context) const {
Expand All @@ -82,7 +82,7 @@ void AddPartitionSpec::GenerateRequirements(TableUpdateContext& context) const {
// SetDefaultPartitionSpec

void SetDefaultPartitionSpec::ApplyTo(TableMetadataBuilder& builder) const {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
builder.SetDefaultPartitionSpec(spec_id_);
}

void SetDefaultPartitionSpec::GenerateRequirements(TableUpdateContext& context) const {
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ if(ICEBERG_BUILD_BUNDLE)
USE_BUNDLE
SOURCES
transaction_test.cc
update_partition_spec_test.cc
update_properties_test.cc
update_sort_order_test.cc)

Expand Down
Loading
Loading