Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 77 additions & 9 deletions src/iceberg/catalog/memory/in_memory_catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "iceberg/table_metadata.h"
#include "iceberg/table_requirement.h"
#include "iceberg/table_update.h"
#include "iceberg/transaction.h"
#include "iceberg/util/macros.h"

namespace iceberg {
Expand Down Expand Up @@ -318,7 +319,7 @@ Result<std::string> InMemoryNamespace::GetTableMetadataLocation(
ICEBERG_RETURN_UNEXPECTED(ns);
const auto it = ns.value()->table_metadata_locations_.find(table_ident.name);
if (it == ns.value()->table_metadata_locations_.end()) {
return NotFound("{} does not exist", table_ident.name);
return NotFound("Table does not exist: {}", table_ident);
}
return it->second;
}
Expand Down Expand Up @@ -405,25 +406,79 @@ Result<std::shared_ptr<Table>> InMemoryCatalog::CreateTable(
const std::string& location,
const std::unordered_map<std::string, std::string>& properties) {
std::unique_lock lock(mutex_);
return NotImplemented("create table");
if (root_namespace_->TableExists(identifier).value_or(false)) {
return AlreadyExists("Table already exists: {}", identifier);
}

std::string base_location =
location.empty() ? warehouse_location_ + "/" + identifier.ToString() : location;

ICEBERG_ASSIGN_OR_RAISE(auto table_metadata, TableMetadata::Make(*schema, *spec, *order,
location, properties));

ICEBERG_ASSIGN_OR_RAISE(
auto metadata_file_location,
TableMetadataUtil::Write(*file_io_, nullptr, "", *table_metadata));
ICEBERG_RETURN_UNEXPECTED(
root_namespace_->UpdateTableMetadataLocation(identifier, metadata_file_location));
return Table::Make(identifier, std::move(table_metadata),
std::move(metadata_file_location), file_io_,
std::static_pointer_cast<Catalog>(shared_from_this()));
}

Result<bool> IsCreate(
const std::vector<std::unique_ptr<TableRequirement>>& requirements) {
bool is_create = std::ranges::any_of(requirements, [](const auto& req) {
return dynamic_cast<table::AssertDoesNotExist*>(req.get()) != nullptr;
});

if (is_create) {
std::vector<std::unique_ptr<TableRequirement>> invalid_requirements;
// std::ranges::copy_if(
// requirements, std::back_inserter(invalid_requirements), [](const auto& req) {
// return dynamic_cast<table::AssertDoesNotExist*>(req.get()) == nullptr;
// });

ICEBERG_PRECHECK(invalid_requirements.empty(), "Invalid create requirements");
}

return is_create;
}

Result<std::shared_ptr<Table>> InMemoryCatalog::UpdateTable(
const TableIdentifier& identifier,
const std::vector<std::unique_ptr<TableRequirement>>& requirements,
const std::vector<std::unique_ptr<TableUpdate>>& updates) {
std::unique_lock lock(mutex_);
ICEBERG_ASSIGN_OR_RAISE(auto base_metadata_location,
root_namespace_->GetTableMetadataLocation(identifier));
ICEBERG_ASSIGN_OR_RAISE(auto is_create, IsCreate(requirements));
std::unique_ptr<TableMetadata> base;
std::string base_metadata_location;
std::unique_ptr<TableMetadataBuilder> builder;
if (is_create) {
// TODO(zhuo.wang) Construct empty tablemetadata
int8_t format_version = TableMetadata::kDefaultTableFormatVersion;
auto it = std::ranges::find_if(updates, [](const auto& update) {
return dynamic_cast<const table::UpgradeFormatVersion*>(update.get()) != nullptr;
});
if (it != updates.end()) {
const auto* upgrade_version =
dynamic_cast<const table::UpgradeFormatVersion*>(it->get());
format_version = upgrade_version->format_version();
}

ICEBERG_ASSIGN_OR_RAISE(auto base,
TableMetadataUtil::Read(*file_io_, base_metadata_location));
builder = TableMetadataBuilder::BuildFromEmpty(format_version);
} else {
ICEBERG_ASSIGN_OR_RAISE(base_metadata_location,
root_namespace_->GetTableMetadataLocation(identifier));
ICEBERG_ASSIGN_OR_RAISE(base,
TableMetadataUtil::Read(*file_io_, base_metadata_location));
builder = TableMetadataBuilder::BuildFrom(base.get());
}

for (const auto& requirement : requirements) {
ICEBERG_RETURN_UNEXPECTED(requirement->Validate(base.get()));
}

auto builder = TableMetadataBuilder::BuildFrom(base.get());
for (const auto& update : updates) {
update->ApplyTo(*builder);
}
Expand All @@ -445,7 +500,20 @@ Result<std::shared_ptr<Transaction>> InMemoryCatalog::StageCreateTable(
const std::string& location,
const std::unordered_map<std::string, std::string>& properties) {
std::unique_lock lock(mutex_);
return NotImplemented("stage create table");
if (root_namespace_->TableExists(identifier).value_or(false)) {
return AlreadyExists("Table already exists: {}", identifier);
}

std::string base_location =
location.empty() ? warehouse_location_ + "/" + identifier.ToString() : location;

ICEBERG_ASSIGN_OR_RAISE(
auto table_metadata,
TableMetadata::Make(*schema, *spec, *order, base_location, properties));
ICEBERG_ASSIGN_OR_RAISE(
auto table, StagedTable::Make(identifier, std::move(table_metadata), "", file_io_,
shared_from_this()));
return Transaction::Make(std::move(table), Transaction::Kind::kCreate, false);
}

Result<bool> InMemoryCatalog::TableExists(const TableIdentifier& identifier) const {
Expand Down Expand Up @@ -495,7 +563,7 @@ Result<std::shared_ptr<Table>> InMemoryCatalog::RegisterTable(

std::unique_lock lock(mutex_);
if (!root_namespace_->NamespaceExists(identifier.ns)) {
return NoSuchNamespace("table namespace does not exist.");
return NoSuchNamespace("Table namespace does not exist: {}", identifier.ns);
}
if (!root_namespace_->RegisterTable(identifier, metadata_file_location)) {
return UnknownError("The registry failed.");
Expand Down
32 changes: 32 additions & 0 deletions src/iceberg/table_identifier.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
/// \file iceberg/table_identifier.h
/// A TableIdentifier is a unique identifier for a table

#include <format>
#include <sstream>
#include <string>
#include <vector>

Expand All @@ -35,6 +37,15 @@ struct ICEBERG_EXPORT Namespace {
std::vector<std::string> levels;

bool operator==(const Namespace& other) const { return levels == other.levels; }

std::string ToString() const {
std::ostringstream oss;
for (size_t i = 0; i < levels.size(); ++i) {
if (i) oss << '.';
oss << levels[i];
}
return oss.str();
}
};

/// \brief Identifies a table in iceberg catalog.
Expand All @@ -53,6 +64,27 @@ struct ICEBERG_EXPORT TableIdentifier {
}
return {};
}

std::string ToString() const { return ns.ToString() + '.' + name; }
};

} // namespace iceberg

namespace std {

template <>
struct formatter<iceberg::Namespace> : std::formatter<std::string> {
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
auto format(const iceberg::Namespace& ns, format_context& ctx) const {
return std::formatter<std::string>::format(ns.ToString(), ctx);
}
};

template <>
struct formatter<iceberg::TableIdentifier> : std::formatter<std::string> {
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
auto format(const iceberg::TableIdentifier& id, format_context& ctx) const {
return std::formatter<std::string>::format(id.ToString(), ctx);
}
};
} // namespace std
110 changes: 109 additions & 1 deletion src/iceberg/table_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "iceberg/table_metadata.h"

#include <algorithm>
#include <atomic>
#include <charconv>
#include <chrono>
#include <cstdint>
Expand All @@ -36,6 +37,7 @@
#include "iceberg/exception.h"
#include "iceberg/file_io.h"
#include "iceberg/json_internal.h"
#include "iceberg/metrics_config.h"
#include "iceberg/partition_spec.h"
#include "iceberg/result.h"
#include "iceberg/schema.h"
Expand All @@ -47,6 +49,7 @@
#include "iceberg/util/gzip_internal.h"
#include "iceberg/util/location_util.h"
#include "iceberg/util/macros.h"
#include "iceberg/util/type_util.h"
#include "iceberg/util/uuid.h"
namespace iceberg {
namespace {
Expand All @@ -65,6 +68,106 @@ std::string ToString(const MetadataLogEntry& entry) {
entry.metadata_file);
}

Result<std::shared_ptr<PartitionSpec>> FreshPartitionSpec(int32_t spec_id,
const Schema& fresh_schema,
const Schema& base_schema,
const PartitionSpec& spec) {
int32_t last_assigned_field_id = -1;
std::vector<PartitionField> partition_fields;
for (auto& field : spec.fields()) {
ICEBERG_ASSIGN_OR_RAISE(auto source_name,
base_schema.FindColumnNameById(field.field_id()));
if (!source_name.has_value()) [[unlikely]] {
return InvalidSchema("Partition field id {} does not exist in the schema",
field.field_id());
}
ICEBERG_ASSIGN_OR_RAISE(auto fresh_field,
fresh_schema.FindFieldByName(source_name.value()));
if (!fresh_field.has_value()) [[unlikely]] {
return InvalidSchema("Partition field {} does not exist in the schema",
source_name.value());
}
partition_fields.emplace_back(
fresh_field.value().get().field_id(), ++last_assigned_field_id,
std::string(fresh_field.value().get().name()), field.transform());
}
return PartitionSpec::Make(fresh_schema, spec_id, std::move(partition_fields), false,
last_assigned_field_id);
}

Result<std::shared_ptr<SortOrder>> FreshSortOrder(int32_t order_id, const Schema& schema,
const SortOrder& order) {
if (order.is_unsorted()) {
return SortOrder::Unsorted();
}

std::vector<SortField> fresh_fields;
for (const auto& field : order.fields()) {
ICEBERG_ASSIGN_OR_RAISE(auto source_name,
schema.FindColumnNameById(field.source_id()));
if (!source_name.has_value()) {
return InvalidSchema("Unable to find source field with ID {} in the old schema",
field.source_id());
}

ICEBERG_ASSIGN_OR_RAISE(auto fresh_field,
schema.FindFieldByName(source_name.value()));
if (!fresh_field.has_value()) {
return InvalidSchema("Unable to find field '{}' in the new schema",
source_name.value());
}

int32_t new_source_id = fresh_field.value().get().field_id();
fresh_fields.emplace_back(new_source_id, field.transform(), field.direction(),
field.null_order());
}

return SortOrder::Make(order_id, std::move(fresh_fields));
}

Result<std::unique_ptr<TableMetadata>> TableMetadata::Make(
const iceberg::Schema& schema, const iceberg::PartitionSpec& spec,
const iceberg::SortOrder& sort_order, const std::string& location,
const std::unordered_map<std::string, std::string>& properties, int format_version) {
for (const auto& [key, _] : properties) {
if (TableProperties::reserved_properties().contains(key)) {
return InvalidArgument(
"Table properties should not contain reserved properties, but got {}", key);
}
}

// Reassign all column ids to ensure consistency
std::atomic<int32_t> last_column_id = 0;
auto next_id = [&last_column_id]() -> int32_t { return ++last_column_id; };
ICEBERG_ASSIGN_OR_RAISE(auto fresh_schema,
AssignFreshIds(Schema::kInitialSchemaId, schema, next_id));

// rebuild the partition spec using the new column ids
ICEBERG_ASSIGN_OR_RAISE(
auto fresh_spec,
FreshPartitionSpec(PartitionSpec::kInitialSpecId, *fresh_schema, schema, spec));

// rebuild the sort order using the new column ids
int32_t fresh_order_id =
sort_order.is_unsorted() ? sort_order.order_id() : SortOrder::kInitialSortOrderId;
ICEBERG_ASSIGN_OR_RAISE(auto fresh_order,
FreshSortOrder(fresh_order_id, *fresh_schema, sort_order))

// Validata the metrics configuration.
ICEBERG_RETURN_UNEXPECTED(
MetricsConfig::VerifyReferencedColumns(properties, *fresh_schema));

// TODO(anyone) Validate commit properties

return TableMetadataBuilder::BuildFromEmpty(format_version)
->SetLocation(location)
.AddSchema(std::move(fresh_schema))
.AddPartitionSpec(std::move(fresh_spec))
.AddSortOrder(std::move(fresh_order))
.SetProperties(properties)
.Build();
}

Result<std::shared_ptr<Schema>> TableMetadata::Schema() const {
return SchemaById(current_schema_id);
}
Expand Down Expand Up @@ -405,6 +508,10 @@ class TableMetadataBuilder::Impl {
const TableMetadata* base() const { return base_; }
const TableMetadata& metadata() const { return metadata_; }

void SetLocation(std::string_view location) {
metadata_.location = std::string(location);
}

void SetMetadataLocation(std::string_view metadata_location) {
metadata_location_ = std::string(metadata_location);
if (base_ != nullptr) {
Expand Down Expand Up @@ -826,7 +933,8 @@ TableMetadataBuilder& TableMetadataBuilder::RemoveProperties(
}

TableMetadataBuilder& TableMetadataBuilder::SetLocation(std::string_view location) {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
impl_->SetLocation(location);
return *this;
}

TableMetadataBuilder& TableMetadataBuilder::AddEncryptionKey(
Expand Down
6 changes: 6 additions & 0 deletions src/iceberg/table_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ struct ICEBERG_EXPORT TableMetadata {
/// A `long` higher than all assigned row IDs
int64_t next_row_id;

static Result<std::unique_ptr<TableMetadata>> Make(
const iceberg::Schema& schema, const iceberg::PartitionSpec& spec,
const iceberg::SortOrder& sort_order, const std::string& location,
const std::unordered_map<std::string, std::string>& properties,
int format_version = kDefaultTableFormatVersion);

/// \brief Get the current schema, return NotFoundError if not found
Result<std::shared_ptr<iceberg::Schema>> Schema() const;
/// \brief Get the current schema by ID, return NotFoundError if not found
Expand Down
15 changes: 15 additions & 0 deletions src/iceberg/test/in_memory_catalog_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
#include <gtest/gtest.h>

#include "iceberg/arrow/arrow_fs_file_io_internal.h"
#include "iceberg/partition_spec.h"
#include "iceberg/schema.h"
#include "iceberg/sort_order.h"
#include "iceberg/table.h"
#include "iceberg/table_identifier.h"
#include "iceberg/table_metadata.h"
Expand Down Expand Up @@ -106,6 +108,19 @@ TEST_F(InMemoryCatalogTest, TableExists) {
EXPECT_THAT(result, HasValue(::testing::Eq(false)));
}

TEST_F(InMemoryCatalogTest, CreateTable) {
TableIdentifier table_ident{.ns = {}, .name = "t1"};
auto schema = std::make_shared<Schema>(
std::vector<SchemaField>{SchemaField::MakeRequired(1, "x", int64())},
/*schema_id=*/1);
auto spec = PartitionSpec::Unpartitioned();

auto table = catalog_->CreateTable(table_ident, schema, spec, SortOrder::Unsorted(),
GenerateTestTableLocation(table_ident.name),
{{"property1", "value1"}});
EXPECT_THAT(table, IsOk());
}

TEST_F(InMemoryCatalogTest, RegisterTable) {
TableIdentifier tableIdent{.ns = {}, .name = "t1"};

Expand Down
Loading