Skip to content

Commit 5b11c46

Browse files
committed
feat: add projection and filtering to manifest reader
- Consolidate `ManifestReader` implementation into `manifest_reader.cc` and remove `manifest_reader_internal.cc`. - Implement fluent API for column selection, partition filtering, and row filtering. - Support lazy initialization of the underlying Avro reader. - Add various filtering support for entries.
1 parent dba8f92 commit 5b11c46

14 files changed

+1743
-763
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ set(ICEBERG_SOURCES
4242
manifest/manifest_entry.cc
4343
manifest/manifest_list.cc
4444
manifest/manifest_reader.cc
45-
manifest/manifest_reader_internal.cc
4645
manifest/manifest_writer.cc
4746
manifest/v1_metadata.cc
4847
manifest/v2_metadata.cc

src/iceberg/manifest/manifest_entry.h

Lines changed: 90 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -178,94 +178,114 @@ struct ICEBERG_EXPORT DataFile {
178178
/// present
179179
std::optional<int64_t> content_size_in_bytes;
180180

181+
inline static constexpr int32_t kContentFieldId = 134;
181182
inline static const SchemaField kContent = SchemaField::MakeOptional(
182-
134, "content", iceberg::int32(),
183+
kContentFieldId, "content", int32(),
183184
"Contents of the file: 0=data, 1=position deletes, 2=equality deletes");
185+
186+
inline static constexpr int32_t kFilePathFieldId = 100;
184187
inline static const SchemaField kFilePath = SchemaField::MakeRequired(
185-
100, "file_path", iceberg::string(), "Location URI with FS scheme");
186-
inline static const SchemaField kFileFormat = SchemaField::MakeRequired(
187-
101, "file_format", iceberg::string(), "File format name: avro, orc, or parquet");
188-
inline static const int32_t kPartitionFieldId = 102;
188+
kFilePathFieldId, "file_path", string(), "Location URI with FS scheme");
189+
190+
inline static constexpr int32_t kFileFormatFieldId = 101;
191+
inline static const SchemaField kFileFormat =
192+
SchemaField::MakeRequired(kFileFormatFieldId, "file_format", string(),
193+
"File format name: avro, orc, or parquet");
194+
195+
inline static constexpr int32_t kPartitionFieldId = 102;
189196
inline static const std::string kPartitionField = "partition";
190197
inline static const std::string kPartitionDoc =
191198
"Partition data tuple, schema based on the partition spec";
199+
200+
inline static constexpr int32_t kRecordCountFieldId = 103;
192201
inline static const SchemaField kRecordCount = SchemaField::MakeRequired(
193-
103, "record_count", iceberg::int64(), "Number of records in the file");
202+
kRecordCountFieldId, "record_count", int64(), "Number of records in the file");
203+
204+
inline static constexpr int32_t kFileSizeFieldId = 104;
194205
inline static const SchemaField kFileSize = SchemaField::MakeRequired(
195-
104, "file_size_in_bytes", iceberg::int64(), "Total file size in bytes");
206+
kFileSizeFieldId, "file_size_in_bytes", int64(), "Total file size in bytes");
207+
208+
inline static constexpr int32_t kColumnSizesFieldId = 108;
196209
inline static const SchemaField kColumnSizes = SchemaField::MakeOptional(
197-
108, "column_sizes",
198-
std::make_shared<MapType>(
199-
SchemaField::MakeRequired(117, std::string(MapType::kKeyName),
200-
iceberg::int32()),
201-
SchemaField::MakeRequired(118, std::string(MapType::kValueName),
202-
iceberg::int64())),
210+
kColumnSizesFieldId, "column_sizes",
211+
map(SchemaField::MakeRequired(117, std::string(MapType::kKeyName), int32()),
212+
SchemaField::MakeRequired(118, std::string(MapType::kValueName), int64())),
203213
"Map of column id to total size on disk");
214+
215+
inline static constexpr int32_t kValueCountsFieldId = 109;
204216
inline static const SchemaField kValueCounts = SchemaField::MakeOptional(
205-
109, "value_counts",
206-
std::make_shared<MapType>(
207-
SchemaField::MakeRequired(119, std::string(MapType::kKeyName),
208-
iceberg::int32()),
209-
SchemaField::MakeRequired(120, std::string(MapType::kValueName),
210-
iceberg::int64())),
217+
kValueCountsFieldId, "value_counts",
218+
map(SchemaField::MakeRequired(119, std::string(MapType::kKeyName), int32()),
219+
SchemaField::MakeRequired(120, std::string(MapType::kValueName), int64())),
211220
"Map of column id to total count, including null and NaN");
221+
222+
inline static constexpr int32_t kNullValueCountsFieldId = 110;
212223
inline static const SchemaField kNullValueCounts = SchemaField::MakeOptional(
213-
110, "null_value_counts",
214-
std::make_shared<MapType>(
215-
SchemaField::MakeRequired(121, std::string(MapType::kKeyName),
216-
iceberg::int32()),
217-
SchemaField::MakeRequired(122, std::string(MapType::kValueName),
218-
iceberg::int64())),
224+
kNullValueCountsFieldId, "null_value_counts",
225+
map(SchemaField::MakeRequired(121, std::string(MapType::kKeyName), int32()),
226+
SchemaField::MakeRequired(122, std::string(MapType::kValueName), int64())),
219227
"Map of column id to null value count");
228+
229+
inline static constexpr int32_t kNanValueCountsFieldId = 137;
220230
inline static const SchemaField kNanValueCounts = SchemaField::MakeOptional(
221-
137, "nan_value_counts",
222-
std::make_shared<MapType>(
223-
SchemaField::MakeRequired(138, std::string(MapType::kKeyName),
224-
iceberg::int32()),
225-
SchemaField::MakeRequired(139, std::string(MapType::kValueName),
226-
iceberg::int64())),
231+
kNanValueCountsFieldId, "nan_value_counts",
232+
map(SchemaField::MakeRequired(138, std::string(MapType::kKeyName), int32()),
233+
SchemaField::MakeRequired(139, std::string(MapType::kValueName), int64())),
227234
"Map of column id to number of NaN values in the column");
235+
236+
inline static constexpr int32_t kLowerBoundsFieldId = 125;
228237
inline static const SchemaField kLowerBounds = SchemaField::MakeOptional(
229-
125, "lower_bounds",
230-
std::make_shared<MapType>(
231-
SchemaField::MakeRequired(126, std::string(MapType::kKeyName),
232-
iceberg::int32()),
233-
SchemaField::MakeRequired(127, std::string(MapType::kValueName),
234-
iceberg::binary())),
238+
kLowerBoundsFieldId, "lower_bounds",
239+
map(SchemaField::MakeRequired(126, std::string(MapType::kKeyName), int32()),
240+
SchemaField::MakeRequired(127, std::string(MapType::kValueName), binary())),
235241
"Map of column id to lower bound");
242+
243+
inline static constexpr int32_t kUpperBoundsFieldId = 128;
236244
inline static const SchemaField kUpperBounds = SchemaField::MakeOptional(
237-
128, "upper_bounds",
238-
std::make_shared<MapType>(
239-
SchemaField::MakeRequired(129, std::string(MapType::kKeyName),
240-
iceberg::int32()),
241-
SchemaField::MakeRequired(130, std::string(MapType::kValueName),
242-
iceberg::binary())),
245+
kUpperBoundsFieldId, "upper_bounds",
246+
map(SchemaField::MakeRequired(129, std::string(MapType::kKeyName), int32()),
247+
SchemaField::MakeRequired(130, std::string(MapType::kValueName), binary())),
243248
"Map of column id to upper bound");
249+
250+
inline static constexpr int32_t kKeyMetadataFieldId = 131;
244251
inline static const SchemaField kKeyMetadata = SchemaField::MakeOptional(
245-
131, "key_metadata", iceberg::binary(), "Encryption key metadata blob");
252+
kKeyMetadataFieldId, "key_metadata", binary(), "Encryption key metadata blob");
253+
254+
inline static constexpr int32_t kSplitOffsetsFieldId = 132;
246255
inline static const SchemaField kSplitOffsets = SchemaField::MakeOptional(
247-
132, "split_offsets",
248-
std::make_shared<ListType>(SchemaField::MakeRequired(
249-
133, std::string(ListType::kElementName), iceberg::int64())),
256+
kSplitOffsetsFieldId, "split_offsets",
257+
list(SchemaField::MakeRequired(133, std::string(ListType::kElementName), int64())),
250258
"Splittable offsets");
259+
260+
inline static constexpr int32_t kEqualityIdsFieldId = 135;
251261
inline static const SchemaField kEqualityIds = SchemaField::MakeOptional(
252-
135, "equality_ids",
253-
std::make_shared<ListType>(SchemaField::MakeRequired(
254-
136, std::string(ListType::kElementName), iceberg::int32())),
262+
kEqualityIdsFieldId, "equality_ids",
263+
list(SchemaField::MakeRequired(136, std::string(ListType::kElementName), int32())),
255264
"Equality comparison field IDs");
256-
inline static const SchemaField kSortOrderId =
257-
SchemaField::MakeOptional(140, "sort_order_id", iceberg::int32(), "Sort order ID");
258-
inline static const SchemaField kFirstRowId = SchemaField::MakeOptional(
259-
142, "first_row_id", iceberg::int64(), "Starting row ID to assign to new rows");
265+
266+
inline static constexpr int32_t kSortOrderIdFieldId = 140;
267+
inline static const SchemaField kSortOrderId = SchemaField::MakeOptional(
268+
kSortOrderIdFieldId, "sort_order_id", int32(), "Sort order ID");
269+
270+
inline static constexpr int32_t kFirstRowIdFieldId = 142;
271+
inline static const SchemaField kFirstRowId =
272+
SchemaField::MakeOptional(kFirstRowIdFieldId, "first_row_id", int64(),
273+
"Starting row ID to assign to new rows");
274+
275+
inline static constexpr int32_t kReferencedDataFileFieldId = 143;
260276
inline static const SchemaField kReferencedDataFile = SchemaField::MakeOptional(
261-
143, "referenced_data_file", iceberg::string(),
277+
kReferencedDataFileFieldId, "referenced_data_file", string(),
262278
"Fully qualified location (URI with FS scheme) of a data file that all deletes "
263279
"reference");
280+
281+
inline static constexpr int32_t kContentOffsetFieldId = 144;
264282
inline static const SchemaField kContentOffset =
265-
SchemaField::MakeOptional(144, "content_offset", iceberg::int64(),
283+
SchemaField::MakeOptional(kContentOffsetFieldId, "content_offset", int64(),
266284
"The offset in the file where the content starts");
285+
286+
inline static constexpr int32_t kContentSizeFieldId = 145;
267287
inline static const SchemaField kContentSize =
268-
SchemaField::MakeOptional(145, "content_size_in_bytes", iceberg::int64(),
288+
SchemaField::MakeOptional(kContentSizeFieldId, "content_size_in_bytes", int64(),
269289
"The length of referenced content stored in the file");
270290

271291
bool operator==(const DataFile& other) const = default;
@@ -298,16 +318,24 @@ struct ICEBERG_EXPORT ManifestEntry {
298318
/// File path, partition tuple, metrics, ...
299319
std::shared_ptr<DataFile> data_file;
300320

321+
inline static constexpr int32_t kStatusFieldId = 0;
301322
inline static const SchemaField kStatus =
302-
SchemaField::MakeRequired(0, "status", iceberg::int32());
323+
SchemaField::MakeRequired(kStatusFieldId, "status", int32());
324+
325+
inline static constexpr int32_t kSnapshotIdFieldId = 1;
303326
inline static const SchemaField kSnapshotId =
304-
SchemaField::MakeOptional(1, "snapshot_id", iceberg::int64());
327+
SchemaField::MakeOptional(kSnapshotIdFieldId, "snapshot_id", int64());
328+
305329
inline static const int32_t kDataFileFieldId = 2;
306330
inline static const std::string kDataFileField = "data_file";
331+
332+
inline static constexpr int32_t kSequenceNumberFieldId = 3;
307333
inline static const SchemaField kSequenceNumber =
308-
SchemaField::MakeOptional(3, "sequence_number", iceberg::int64());
309-
inline static const SchemaField kFileSequenceNumber =
310-
SchemaField::MakeOptional(4, "file_sequence_number", iceberg::int64());
334+
SchemaField::MakeOptional(kSequenceNumberFieldId, "sequence_number", int64());
335+
336+
inline static constexpr int32_t kFileSequenceNumberFieldId = 4;
337+
inline static const SchemaField kFileSequenceNumber = SchemaField::MakeOptional(
338+
kFileSequenceNumberFieldId, "file_sequence_number", int64());
311339

312340
/// \brief Check if this manifest entry is deleted.
313341
constexpr bool IsAlive() const {

src/iceberg/manifest/manifest_list.cc

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,24 @@
1919

2020
#include "iceberg/manifest/manifest_list.h"
2121

22-
#include "iceberg/schema.h"
22+
#include <memory>
23+
24+
#include "iceberg/type.h"
2325

2426
namespace iceberg {
2527

26-
const StructType& PartitionFieldSummary::Type() {
27-
static const StructType kInstance{{
28+
const std::shared_ptr<StructType>& PartitionFieldSummary::Type() {
29+
static const auto kInstance = std::make_shared<StructType>(std::vector<SchemaField>{
2830
PartitionFieldSummary::kContainsNull,
2931
PartitionFieldSummary::kContainsNaN,
3032
PartitionFieldSummary::kLowerBound,
3133
PartitionFieldSummary::kUpperBound,
32-
}};
34+
});
3335
return kInstance;
3436
}
3537

36-
const std::shared_ptr<Schema>& ManifestFile::Type() {
37-
static const auto kInstance = std::make_shared<Schema>(std::vector<SchemaField>{
38+
const std::shared_ptr<StructType>& ManifestFile::Type() {
39+
static const auto kInstance = std::make_shared<StructType>(std::vector<SchemaField>{
3840
kManifestPath,
3941
kManifestLength,
4042
kPartitionSpecId,

src/iceberg/manifest/manifest_list.h

Lines changed: 60 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ struct ICEBERG_EXPORT PartitionFieldSummary {
6969

7070
bool operator==(const PartitionFieldSummary& other) const = default;
7171

72-
static const StructType& Type();
72+
static const std::shared_ptr<StructType>& Type();
7373
};
7474

7575
/// \brief The type of files tracked by the manifest, either data or delete files; 0 for
@@ -153,51 +153,86 @@ struct ICEBERG_EXPORT ManifestFile {
153153
/// \brief Checks if this manifest file contains entries with DELETED status
154154
bool has_deleted_files() const { return deleted_files_count.value_or(1) > 0; }
155155

156+
inline static const int32_t kManifestPathFieldId = 500;
156157
inline static const SchemaField kManifestPath = SchemaField::MakeRequired(
157-
500, "manifest_path", iceberg::string(), "Location URI with FS scheme");
158+
kManifestPathFieldId, "manifest_path", string(), "Location URI with FS scheme");
159+
160+
inline static const int32_t kManifestLengthFieldId = 501;
158161
inline static const SchemaField kManifestLength = SchemaField::MakeRequired(
159-
501, "manifest_length", iceberg::int64(), "Total file size in bytes");
162+
kManifestLengthFieldId, "manifest_length", int64(), "Total file size in bytes");
163+
164+
inline static const int32_t kPartitionSpecIdFieldId = 502;
160165
inline static const SchemaField kPartitionSpecId = SchemaField::MakeRequired(
161-
502, "partition_spec_id", iceberg::int32(), "Spec ID used to write");
166+
kPartitionSpecIdFieldId, "partition_spec_id", int32(), "Spec ID used to write");
167+
168+
inline static const int32_t kContentFieldId = 517;
162169
inline static const SchemaField kContent = SchemaField::MakeOptional(
163-
517, "content", iceberg::int32(), "Contents of the manifest: 0=data, 1=deletes");
170+
kContentFieldId, "content", int32(), "Contents of the manifest: 0=data, 1=deletes");
171+
172+
inline static const int32_t kSequenceNumberFieldId = 515;
164173
inline static const SchemaField kSequenceNumber =
165-
SchemaField::MakeOptional(515, "sequence_number", iceberg::int64(),
174+
SchemaField::MakeOptional(kSequenceNumberFieldId, "sequence_number", int64(),
166175
"Sequence number when the manifest was added");
176+
177+
inline static const int32_t kMinSequenceNumberFieldId = 516;
167178
inline static const SchemaField kMinSequenceNumber =
168-
SchemaField::MakeOptional(516, "min_sequence_number", iceberg::int64(),
179+
SchemaField::MakeOptional(kMinSequenceNumberFieldId, "min_sequence_number", int64(),
169180
"Lowest sequence number in the manifest");
170-
inline static const SchemaField kAddedSnapshotId = SchemaField::MakeRequired(
171-
503, "added_snapshot_id", iceberg::int64(), "Snapshot ID that added the manifest");
181+
182+
inline static const int32_t kAddedSnapshotIdFieldId = 503;
183+
inline static const SchemaField kAddedSnapshotId =
184+
SchemaField::MakeRequired(kAddedSnapshotIdFieldId, "added_snapshot_id", int64(),
185+
"Snapshot ID that added the manifest");
186+
187+
inline static const int32_t kAddedFilesCountFieldId = 504;
172188
inline static const SchemaField kAddedFilesCount = SchemaField::MakeOptional(
173-
504, "added_files_count", iceberg::int32(), "Added entry count");
174-
inline static const SchemaField kExistingFilesCount = SchemaField::MakeOptional(
175-
505, "existing_files_count", iceberg::int32(), "Existing entry count");
189+
kAddedFilesCountFieldId, "added_files_count", int32(), "Added entry count");
190+
191+
inline static const int32_t kExistingFilesCountFieldId = 505;
192+
inline static const SchemaField kExistingFilesCount =
193+
SchemaField::MakeOptional(kExistingFilesCountFieldId, "existing_files_count",
194+
int32(), "Existing entry count");
195+
196+
inline static const int32_t kDeletedFilesCountFieldId = 506;
176197
inline static const SchemaField kDeletedFilesCount = SchemaField::MakeOptional(
177-
506, "deleted_files_count", iceberg::int32(), "Deleted entry count");
198+
kDeletedFilesCountFieldId, "deleted_files_count", int32(), "Deleted entry count");
199+
200+
inline static const int32_t kAddedRowsCountFieldId = 512;
178201
inline static const SchemaField kAddedRowsCount = SchemaField::MakeOptional(
179-
512, "added_rows_count", iceberg::int64(), "Added rows count");
202+
kAddedRowsCountFieldId, "added_rows_count", int64(), "Added rows count");
203+
204+
inline static const int32_t kExistingRowsCountFieldId = 513;
180205
inline static const SchemaField kExistingRowsCount = SchemaField::MakeOptional(
181-
513, "existing_rows_count", iceberg::int64(), "Existing rows count");
206+
kExistingRowsCountFieldId, "existing_rows_count", int64(), "Existing rows count");
207+
208+
inline static const int32_t kDeletedRowsCountFieldId = 514;
182209
inline static const SchemaField kDeletedRowsCount = SchemaField::MakeOptional(
183-
514, "deleted_rows_count", iceberg::int64(), "Deleted rows count");
210+
kDeletedRowsCountFieldId, "deleted_rows_count", int64(), "Deleted rows count");
211+
212+
inline static const int32_t kPartitionSummaryFieldId = 507;
184213
inline static const SchemaField kPartitions = SchemaField::MakeOptional(
185-
507, "partitions",
186-
std::make_shared<ListType>(SchemaField::MakeRequired(
187-
508, std::string(ListType::kElementName),
188-
struct_(
189-
{PartitionFieldSummary::kContainsNull, PartitionFieldSummary::kContainsNaN,
190-
PartitionFieldSummary::kLowerBound, PartitionFieldSummary::kUpperBound}))),
214+
kPartitionSummaryFieldId, "partitions",
215+
list(SchemaField::MakeRequired(508, std::string(ListType::kElementName),
216+
struct_({
217+
PartitionFieldSummary::kContainsNull,
218+
PartitionFieldSummary::kContainsNaN,
219+
PartitionFieldSummary::kLowerBound,
220+
PartitionFieldSummary::kUpperBound,
221+
}))),
191222
"Summary for each partition");
223+
224+
inline static const int32_t kKeyMetadataFieldId = 519;
192225
inline static const SchemaField kKeyMetadata = SchemaField::MakeOptional(
193-
519, "key_metadata", iceberg::binary(), "Encryption key metadata blob");
226+
kKeyMetadataFieldId, "key_metadata", binary(), "Encryption key metadata blob");
227+
228+
inline static const int32_t kFirstRowIdFieldId = 520;
194229
inline static const SchemaField kFirstRowId = SchemaField::MakeOptional(
195-
520, "first_row_id", iceberg::int64(),
230+
kFirstRowIdFieldId, "first_row_id", int64(),
196231
"Starting row ID to assign to new rows in ADDED data files");
197232

198233
bool operator==(const ManifestFile& other) const = default;
199234

200-
static const std::shared_ptr<Schema>& Type();
235+
static const std::shared_ptr<StructType>& Type();
201236
};
202237

203238
/// Snapshots are embedded in table metadata, but the list of manifests for a snapshot are

0 commit comments

Comments
 (0)