Skip to content

Commit 10d688d

Browse files
committed
feat: assign fresh field ids for new schema
1 parent 7b95952 commit 10d688d

File tree

8 files changed

+449
-21
lines changed

8 files changed

+449
-21
lines changed

src/iceberg/schema.cc

Lines changed: 61 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,28 @@
3333

3434
namespace iceberg {
3535

36-
Schema::Schema(std::vector<SchemaField> fields, std::optional<int32_t> schema_id)
37-
: StructType(std::move(fields)), schema_id_(schema_id) {}
36+
Schema::Schema(std::vector<SchemaField> fields, std::optional<int32_t> schema_id,
37+
std::vector<int32_t> identifier_field_ids)
38+
: StructType(std::move(fields)),
39+
schema_id_(schema_id),
40+
identifier_field_ids_(std::move(identifier_field_ids)) {}
41+
42+
Result<std::unique_ptr<Schema>> Schema::Make(
43+
std::vector<SchemaField> fields, int32_t schema_id,
44+
const std::vector<std::string>& identifier_field_names) {
45+
auto schema = std::make_unique<Schema>(std::move(fields), schema_id);
46+
47+
std::vector<int32_t> fresh_identifier_ids;
48+
for (const auto& name : identifier_field_names) {
49+
ICEBERG_ASSIGN_OR_RAISE(auto field, schema->FindFieldByName(name));
50+
if (!field) {
51+
return InvalidSchema("Cannot find identifier field: {}", name);
52+
}
53+
fresh_identifier_ids.push_back(field.value().get().field_id());
54+
}
55+
schema->identifier_field_ids_ = std::move(fresh_identifier_ids);
56+
return schema;
57+
}
3858

3959
std::optional<int32_t> Schema::schema_id() const { return schema_id_; }
4060

@@ -48,15 +68,16 @@ std::string Schema::ToString() const {
4868
}
4969

5070
bool Schema::Equals(const Schema& other) const {
51-
return schema_id_ == other.schema_id_ && fields_ == other.fields_;
71+
return schema_id_ == other.schema_id_ && fields_ == other.fields_ &&
72+
identifier_field_ids_ == other.identifier_field_ids_;
5273
}
5374

5475
Result<std::optional<std::reference_wrapper<const SchemaField>>> Schema::FindFieldByName(
5576
std::string_view name, bool case_sensitive) const {
5677
if (case_sensitive) {
57-
ICEBERG_ASSIGN_OR_RAISE(auto name_to_id, name_to_id_.Get(*this));
58-
auto it = name_to_id.get().find(name);
59-
if (it == name_to_id.get().end()) {
78+
ICEBERG_ASSIGN_OR_RAISE(auto name_id_map, name_id_map_.Get(*this));
79+
auto it = name_id_map.get().name_to_id.find(name);
80+
if (it == name_id_map.get().name_to_id.end()) {
6081
return std::nullopt;
6182
};
6283
return FindFieldById(it->second);
@@ -77,21 +98,22 @@ Schema::InitIdToFieldMap(const Schema& self) {
7798
return id_to_field;
7899
}
79100

80-
Result<std::unordered_map<std::string, int32_t, StringHash, std::equal_to<>>>
81-
Schema::InitNameToIdMap(const Schema& self) {
82-
std::unordered_map<std::string, int32_t, StringHash, std::equal_to<>> name_to_id;
83-
NameToIdVisitor visitor(name_to_id, /*case_sensitive=*/true);
101+
Result<Schema::NameIdMap> Schema::InitNameIdMap(const Schema& self) {
102+
NameIdMap name_id_map;
103+
NameToIdVisitor visitor(name_id_map.name_to_id, &name_id_map.id_to_name,
104+
/*case_sensitive=*/true);
84105
ICEBERG_RETURN_UNEXPECTED(
85106
VisitTypeInline(self, &visitor, /*path=*/"", /*short_path=*/""));
86107
visitor.Finish();
87-
return name_to_id;
108+
return name_id_map;
88109
}
89110

90111
Result<std::unordered_map<std::string, int32_t, StringHash, std::equal_to<>>>
91112
Schema::InitLowerCaseNameToIdMap(const Schema& self) {
92113
std::unordered_map<std::string, int32_t, StringHash, std::equal_to<>>
93114
lowercase_name_to_id;
94-
NameToIdVisitor visitor(lowercase_name_to_id, /*case_sensitive=*/false);
115+
NameToIdVisitor visitor(lowercase_name_to_id, /*id_to_name=*/nullptr,
116+
/*case_sensitive=*/false);
95117
ICEBERG_RETURN_UNEXPECTED(
96118
VisitTypeInline(self, &visitor, /*path=*/"", /*short_path=*/""));
97119
visitor.Finish();
@@ -108,6 +130,16 @@ Result<std::optional<std::reference_wrapper<const SchemaField>>> Schema::FindFie
108130
return it->second;
109131
}
110132

133+
Result<std::optional<std::string_view>> Schema::FindColumnNameById(
134+
int32_t field_id) const {
135+
ICEBERG_ASSIGN_OR_RAISE(auto name_id_map, name_id_map_.Get(*this));
136+
auto it = name_id_map.get().id_to_name.find(field_id);
137+
if (it == name_id_map.get().id_to_name.end()) {
138+
return std::nullopt;
139+
}
140+
return it->second;
141+
}
142+
111143
Result<std::unordered_map<int32_t, std::vector<size_t>>> Schema::InitIdToPositionPath(
112144
const Schema& self) {
113145
PositionPathVisitor visitor;
@@ -179,4 +211,21 @@ Result<std::unique_ptr<Schema>> Schema::Project(
179211
std::nullopt);
180212
}
181213

214+
const std::vector<int32_t>& Schema::IdentifierFieldIds() const {
215+
return identifier_field_ids_;
216+
}
217+
218+
Result<std::vector<std::string>> Schema::IdentifierFieldNames() const {
219+
std::vector<std::string> names;
220+
names.reserve(identifier_field_ids_.size());
221+
for (auto id : identifier_field_ids_) {
222+
ICEBERG_ASSIGN_OR_RAISE(auto name, FindColumnNameById(id));
223+
if (!name.has_value()) {
224+
return InvalidSchema("Cannot find the field of the specified field id: {}", id);
225+
}
226+
names.emplace_back(name.value());
227+
}
228+
return names;
229+
}
230+
182231
} // namespace iceberg

src/iceberg/schema.h

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,19 @@ class ICEBERG_EXPORT Schema : public StructType {
4949
static constexpr int32_t kInvalidColumnId = -1;
5050

5151
explicit Schema(std::vector<SchemaField> fields,
52-
std::optional<int32_t> schema_id = std::nullopt);
52+
std::optional<int32_t> schema_id = std::nullopt,
53+
std::vector<int32_t> identifier_field_ids = {});
54+
55+
/// \brief Create a schema.
56+
///
57+
/// \param fields The fields that make up the schema.
58+
/// \param schema_id The unique identifier for this schema (default: kInitialSchemaId).
59+
/// \param identifier_field_names Full names of fields that uniquely identify rows in
60+
/// the table (default: empty).
61+
/// \return A new Schema instance or Status if failed.
62+
static Result<std::unique_ptr<Schema>> Make(
63+
std::vector<SchemaField> fields, int32_t schema_id = kInitialSchemaId,
64+
const std::vector<std::string>& identifier_field_names = {});
5365

5466
/// \brief Get the schema ID.
5567
///
@@ -78,6 +90,12 @@ class ICEBERG_EXPORT Schema : public StructType {
7890
Result<std::optional<std::reference_wrapper<const SchemaField>>> FindFieldById(
7991
int32_t field_id) const;
8092

93+
/// \brief Returns the full column name for the given id.
94+
///
95+
/// \param field_id The id of the field to get the full name for.
96+
/// \return The full name of the field with the given id, or std::nullopt if not found.
97+
Result<std::optional<std::string_view>> FindColumnNameById(int32_t field_id) const;
98+
8199
/// \brief Get the accessor to access the field by field id.
82100
///
83101
/// \param field_id The id of the field to get the accessor for.
@@ -103,26 +121,45 @@ class ICEBERG_EXPORT Schema : public StructType {
103121
Result<std::unique_ptr<Schema>> Project(
104122
const std::unordered_set<int32_t>& field_ids) const;
105123

124+
const std::vector<int32_t>& IdentifierFieldIds() const;
125+
Result<std::vector<std::string>> IdentifierFieldNames() const;
126+
106127
friend bool operator==(const Schema& lhs, const Schema& rhs) { return lhs.Equals(rhs); }
107128

108129
private:
109130
/// \brief Compare two schemas for equality.
110131
bool Equals(const Schema& other) const;
111132

133+
struct NameIdMap {
134+
/// \brief Mapping from full field name to ID
135+
///
136+
/// \note Short names for maps and lists are included for any name that does not
137+
/// conflict with a canonical name. For example, a list, 'l', of structs with field
138+
/// 'x' will produce short name 'l.x' in addition to canonical name 'l.element.x'.
139+
std::unordered_map<std::string, int32_t, StringHash, std::equal_to<>> name_to_id;
140+
141+
/// \brief Mapping from field ID to full name
142+
///
143+
/// \note Canonical names, but not short names are set, for example
144+
/// 'list.element.field' instead of 'list.field'.
145+
std::unordered_map<int32_t, std::string> id_to_name;
146+
};
147+
112148
static Result<std::unordered_map<int32_t, std::reference_wrapper<const SchemaField>>>
113149
InitIdToFieldMap(const Schema&);
114-
static Result<std::unordered_map<std::string, int32_t, StringHash, std::equal_to<>>>
115-
InitNameToIdMap(const Schema&);
150+
static Result<NameIdMap> InitNameIdMap(const Schema&);
116151
static Result<std::unordered_map<std::string, int32_t, StringHash, std::equal_to<>>>
117152
InitLowerCaseNameToIdMap(const Schema&);
118153
static Result<std::unordered_map<int32_t, std::vector<size_t>>> InitIdToPositionPath(
119154
const Schema&);
120155

121156
const std::optional<int32_t> schema_id_;
157+
/// Field IDs that uniquely identify rows in the table.
158+
std::vector<int32_t> identifier_field_ids_;
122159
/// Mapping from field id to field.
123160
Lazy<InitIdToFieldMap> id_to_field_;
124161
/// Mapping from field name to field id.
125-
Lazy<InitNameToIdMap> name_to_id_;
162+
Lazy<InitNameIdMap> name_id_map_;
126163
/// Mapping from lowercased field name to field id
127164
Lazy<InitLowerCaseNameToIdMap> lowercase_name_to_id_;
128165
/// Mapping from field id to (nested) position path to access the field.

src/iceberg/test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ endfunction()
5454

5555
add_iceberg_test(schema_test
5656
SOURCES
57+
assign_id_visitor_test.cc
5758
name_mapping_test.cc
5859
partition_field_test.cc
5960
partition_spec_test.cc
Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include <memory>
21+
22+
#include <gmock/gmock.h>
23+
#include <gtest/gtest.h>
24+
25+
#include "iceberg/schema.h"
26+
#include "iceberg/schema_field.h"
27+
#include "iceberg/test/matchers.h"
28+
#include "iceberg/type.h"
29+
#include "iceberg/util/type_util.h"
30+
31+
namespace iceberg {
32+
33+
namespace {
34+
35+
Schema CreateFlatSchema() {
36+
return Schema({
37+
SchemaField::MakeRequired(/*field_id=*/10, "id", iceberg::int64()),
38+
SchemaField::MakeOptional(/*field_id=*/20, "name", iceberg::string()),
39+
SchemaField::MakeOptional(/*field_id=*/30, "age", iceberg::int32()),
40+
SchemaField::MakeRequired(/*field_id=*/40, "data", iceberg::float64()),
41+
});
42+
}
43+
44+
std::shared_ptr<Type> CreateListOfStruct() {
45+
return std::make_shared<ListType>(SchemaField::MakeOptional(
46+
/*field_id=*/101, "element",
47+
std::make_shared<StructType>(std::vector<SchemaField>{
48+
SchemaField::MakeOptional(/*field_id=*/102, "x", iceberg::int32()),
49+
SchemaField::MakeRequired(/*field_id=*/103, "y", iceberg::string()),
50+
})));
51+
}
52+
53+
std::shared_ptr<Type> CreateMapWithStructValue() {
54+
return std::make_shared<MapType>(
55+
SchemaField::MakeRequired(/*field_id=*/201, "key", iceberg::string()),
56+
SchemaField::MakeRequired(
57+
/*field_id=*/202, "value",
58+
std::make_shared<StructType>(std::vector<SchemaField>{
59+
SchemaField::MakeRequired(/*field_id=*/203, "id", iceberg::int64()),
60+
SchemaField::MakeOptional(/*field_id=*/204, "name", iceberg::string()),
61+
})));
62+
}
63+
64+
std::shared_ptr<Type> CreateNestedStruct() {
65+
return std::make_shared<StructType>(std::vector<SchemaField>{
66+
SchemaField::MakeRequired(/*field_id=*/301, "outer_id", iceberg::int64()),
67+
SchemaField::MakeRequired(
68+
/*field_id=*/302, "nested",
69+
std::make_shared<StructType>(std::vector<SchemaField>{
70+
SchemaField::MakeOptional(/*field_id=*/303, "inner_id", iceberg::int32()),
71+
SchemaField::MakeRequired(/*field_id=*/304, "inner_name",
72+
iceberg::string()),
73+
})),
74+
});
75+
}
76+
77+
Schema CreateNestedSchema(std::vector<int32_t> identifier_field_ids = {}) {
78+
return Schema(
79+
{
80+
SchemaField::MakeRequired(/*field_id=*/10, "id", iceberg::int64()),
81+
SchemaField::MakeOptional(/*field_id=*/20, "list", CreateListOfStruct()),
82+
SchemaField::MakeOptional(/*field_id=*/30, "map", CreateMapWithStructValue()),
83+
SchemaField::MakeRequired(/*field_id=*/40, "struct", CreateNestedStruct()),
84+
},
85+
Schema::kInitialSchemaId, std::move(identifier_field_ids));
86+
}
87+
88+
} // namespace
89+
90+
TEST(AssignFreshIdVisitorTest, FlatSchema) {
91+
Schema schema = CreateFlatSchema();
92+
93+
std::atomic<int32_t> id = 0;
94+
auto next_id = [&id]() { return ++id; };
95+
ICEBERG_UNWRAP_OR_FAIL(auto fresh_schema,
96+
AssignFreshIds(Schema::kInitialSchemaId, schema, next_id));
97+
98+
ASSERT_EQ(fresh_schema->fields().size(), schema.fields().size());
99+
EXPECT_EQ(Schema(
100+
{
101+
SchemaField::MakeRequired(/*field_id=*/1, "id", iceberg::int64()),
102+
SchemaField::MakeOptional(/*field_id=*/2, "name", iceberg::string()),
103+
SchemaField::MakeOptional(/*field_id=*/3, "age", iceberg::int32()),
104+
SchemaField::MakeRequired(/*field_id=*/4, "data", iceberg::float64()),
105+
},
106+
Schema::kInitialSchemaId),
107+
*fresh_schema);
108+
}
109+
110+
TEST(AssignFreshIdVisitorTest, NestedSchema) {
111+
Schema schema = CreateNestedSchema();
112+
std::atomic<int32_t> id = 0;
113+
auto next_id = [&id]() { return ++id; };
114+
ICEBERG_UNWRAP_OR_FAIL(auto fresh_schema,
115+
AssignFreshIds(Schema::kInitialSchemaId, schema, next_id));
116+
117+
ASSERT_EQ(4, fresh_schema->fields().size());
118+
for (int32_t i = 0; i < fresh_schema->fields().size(); ++i) {
119+
EXPECT_EQ(i + 1, fresh_schema->fields()[i].field_id());
120+
}
121+
122+
auto list_field = fresh_schema->fields()[1];
123+
auto list_type = std::dynamic_pointer_cast<ListType>(list_field.type());
124+
ASSERT_TRUE(list_type);
125+
auto list_element_field = list_type->fields()[0];
126+
EXPECT_EQ(5, list_element_field.field_id());
127+
auto list_element_type =
128+
std::dynamic_pointer_cast<StructType>(list_element_field.type());
129+
ASSERT_TRUE(list_element_type);
130+
EXPECT_EQ(StructType(std::vector<SchemaField>{
131+
SchemaField::MakeOptional(/*field_id=*/6, "x", iceberg::int32()),
132+
SchemaField::MakeRequired(/*field_id=*/7, "y", iceberg::string()),
133+
}),
134+
*list_element_type);
135+
136+
auto map_field = fresh_schema->fields()[2];
137+
auto map_type = std::dynamic_pointer_cast<MapType>(map_field.type());
138+
ASSERT_TRUE(map_type);
139+
EXPECT_EQ(8, map_type->fields()[0].field_id());
140+
auto map_value_field = map_type->fields()[1];
141+
EXPECT_EQ(9, map_value_field.field_id());
142+
auto map_value_type = std::dynamic_pointer_cast<StructType>(map_value_field.type());
143+
ASSERT_TRUE(map_value_type);
144+
EXPECT_EQ(StructType(std::vector<SchemaField>{
145+
SchemaField::MakeRequired(/*field_id=*/10, "id", iceberg::int64()),
146+
SchemaField::MakeOptional(/*field_id=*/11, "name", iceberg::string()),
147+
}),
148+
*map_value_type);
149+
150+
auto struct_field = fresh_schema->fields()[3];
151+
auto struct_type = std::dynamic_pointer_cast<StructType>(struct_field.type());
152+
ASSERT_TRUE(struct_type);
153+
154+
auto expect_nested_struct_type = std::make_shared<StructType>(std::vector<SchemaField>{
155+
SchemaField::MakeOptional(/*field_id=*/14, "inner_id", iceberg::int32()),
156+
SchemaField::MakeRequired(/*field_id=*/15, "inner_name", iceberg::string()),
157+
});
158+
EXPECT_EQ(StructType(std::vector<SchemaField>{
159+
SchemaField::MakeRequired(/*field_id=*/12, "outer_id", iceberg::int64()),
160+
SchemaField::MakeRequired(
161+
/*field_id=*/13, "nested", expect_nested_struct_type)}),
162+
*struct_type);
163+
164+
auto nested_struct_field = struct_type->fields()[1];
165+
auto nested_struct_type =
166+
std::dynamic_pointer_cast<StructType>(nested_struct_field.type());
167+
ASSERT_TRUE(nested_struct_type);
168+
EXPECT_EQ(*expect_nested_struct_type, *nested_struct_type);
169+
}
170+
171+
TEST(AssignFreshIdVisitorTest, RefreshIdentifierId) {
172+
std::atomic<int32_t> id = 0;
173+
auto next_id = [&id]() { return ++id; };
174+
175+
Schema invalid_schema = CreateNestedSchema({10, 400});
176+
// Invalid identified field id
177+
auto result = AssignFreshIds(Schema::kInitialSchemaId, invalid_schema, next_id);
178+
EXPECT_THAT(result, IsError(ErrorKind::kInvalidSchema));
179+
EXPECT_THAT(result, HasErrorMessage("Cannot find"));
180+
181+
id = 0;
182+
Schema schema = CreateNestedSchema({10, 301});
183+
ICEBERG_UNWRAP_OR_FAIL(auto fresh_schema,
184+
AssignFreshIds(Schema::kInitialSchemaId, schema, next_id));
185+
EXPECT_THAT(fresh_schema->IdentifierFieldIds(), testing::ElementsAre(1, 12));
186+
}
187+
188+
} // namespace iceberg

0 commit comments

Comments
 (0)