-
Notifications
You must be signed in to change notification settings - Fork 75
feat: enhance ManifestReader with projection and filtering support #431
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
537a136 to
d4f7c24
Compare
d4f7c24 to
81b1732
Compare
|
|
||
| namespace { | ||
|
|
||
| #define PARSE_PRIMITIVE_FIELD(item, array_view, type) \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Github is not clever enough to figure out changes in this file. Most parts come from combining manifest_reader.cc and manifest_reader_internal.cc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we change these macros to template functions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with you. But I prefer to do it in a separate PR.
- Consolidate `ManifestReader` implementation into `manifest_reader.cc` and remove `manifest_reader_internal.cc`. - Implement fluent API for column selection, partition filtering, and row filtering. - Support lazy initialization of the underlying Avro reader. - Add various filtering support for entries.
81b1732 to
5b11c46
Compare
|
|
||
| /// \brief Add stats columns to the column list if needed. | ||
| std::vector<std::string> WithStatsColumns( | ||
| const std::vector<std::string>& columns) const; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a static function in ManifestReader
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to be a function declaration without implementation. I guess there is a careless mistake.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is due to a half-way refactoring that I forgot to delete it from the original place.
| class ICEBERG_EXPORT ManifestReader { | ||
| public: | ||
| /// \brief Special value to select all columns from manifest files. | ||
| inline static const std::vector<std::string> kAllColumns{"*"}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no need to be a vector, and it's also defined in Schema::Select
| if (std::ranges::all_of(ManifestReader::kAllColumns, | ||
| [&columns](const std::string& col) { | ||
| return std::ranges::find(columns, col) != columns.cend(); | ||
| })) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if (std::ranges::all_of(ManifestReader::kAllColumns, | |
| [&columns](const std::string& col) { | |
| return std::ranges::find(columns, col) != columns.cend(); | |
| })) { | |
| if (std::ranges::find(columns, ManifestReader::kAllColumns) != columns.cend()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use std::ranges::contains instead.
| if (std::ranges::all_of( | ||
| ManifestReader::kAllColumns, | ||
| [&selected](const std::string& col) { return selected.contains(col); })) { | ||
| return false; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if (std::ranges::all_of( | |
| ManifestReader::kAllColumns, | |
| [&selected](const std::string& col) { return selected.contains(col); })) { | |
| return false; | |
| } | |
| if (selected.contains(ManifestReader::kAllColumns)) { | |
| return false; | |
| } |
If kAllColumns is a string
| /// present | ||
| std::optional<int64_t> content_size_in_bytes; | ||
|
|
||
| inline static constexpr int32_t kContentFieldId = 134; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove inline on static constexpr.
|
|
||
| const StructType& PartitionFieldSummary::Type() { | ||
| static const StructType kInstance{{ | ||
| const std::shared_ptr<StructType>& PartitionFieldSummary::Type() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to change the return type from const T& to const std::shared_ptr<T>&?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now it is a singleton that can be reused.
|
|
||
| namespace { | ||
|
|
||
| #define PARSE_PRIMITIVE_FIELD(item, array_view, type) \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we change these macros to template functions?
| if (std::ranges::all_of(ManifestReader::kAllColumns, | ||
| [&columns](const std::string& col) { | ||
| return std::ranges::find(columns, col) != columns.cend(); | ||
| })) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use std::ranges::contains instead.
|
|
||
| Status ParseLiteral(ArrowArrayView* view_of_partition, int64_t row_idx, | ||
| std::vector<ManifestEntry>& manifest_entries) { | ||
| if (view_of_partition->storage_type == ArrowType::NANOARROW_TYPE_BOOL) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use switch instead of multiple if?
| return manifest_entries; | ||
| } | ||
|
|
||
| const std::unordered_set<std::string> kStatsColumns = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think std::vector<std::string> is enough.
| if (result.has_value()) { | ||
| internal::ArrowArrayGuard array_guard(&result.value()); | ||
| ICEBERG_ASSIGN_OR_RAISE( | ||
| auto parse_result, ParseManifestList(&arrow_schema, &result.value(), *schema_)); | ||
| manifest_files.insert(manifest_files.end(), | ||
| std::make_move_iterator(parse_result.begin()), | ||
| std::make_move_iterator(parse_result.end())); | ||
| } else { | ||
| // eof | ||
| break; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if (result.has_value()) { | |
| internal::ArrowArrayGuard array_guard(&result.value()); | |
| ICEBERG_ASSIGN_OR_RAISE( | |
| auto parse_result, ParseManifestList(&arrow_schema, &result.value(), *schema_)); | |
| manifest_files.insert(manifest_files.end(), | |
| std::make_move_iterator(parse_result.begin()), | |
| std::make_move_iterator(parse_result.end())); | |
| } else { | |
| // eof | |
| break; | |
| } | |
| if (!result.has_value()) { | |
| // eof | |
| break; | |
| } | |
| internal::ArrowArrayGuard array_guard(&result.value()); | |
| ICEBERG_ASSIGN_OR_RAISE( | |
| auto parse_result, ParseManifestList(&arrow_schema, &result.value(), *schema_)); | |
| manifest_files.insert(manifest_files.end(), | |
| std::make_move_iterator(parse_result.begin()), | |
| std::make_move_iterator(parse_result.end())); |
I prefer to write this way, with one less tab.
| } | ||
|
|
||
| bool ManifestReaderImpl::HasPartitionFilter() const { | ||
| return part_filter_ && part_filter_->op() != Expression::Operation::kTrue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
part_filter_ always true
| return part_filter_ && part_filter_->op() != Expression::Operation::kTrue; | |
| return part_filter_->op() != Expression::Operation::kTrue; |
| } | ||
|
|
||
| bool ManifestReaderImpl::HasRowFilter() const { | ||
| return row_filter_ && row_filter_->op() != Expression::Operation::kTrue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| return row_filter_ && row_filter_->op() != Expression::Operation::kTrue; | |
| return row_filter_->op() != Expression::Operation::kTrue; |
| /// present | ||
| std::optional<int64_t> content_size_in_bytes; | ||
|
|
||
| inline static constexpr int32_t kContentFieldId = 134; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: constexpr implies inline, maybe remove the inline, not a strong opinion
| if (view_of_column->storage_type != ArrowType::NANOARROW_TYPE_LIST) { | ||
| return InvalidManifestList("partitions field should be a list."); | ||
| } | ||
| auto view_of_list_iterm = view_of_column->children[0]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| auto view_of_list_iterm = view_of_column->children[0]; | |
| auto view_of_list_item = view_of_column->children[0]; |
typo?
| auto view_of_list_iterm = view_of_column->children[0]; | ||
| // view_of_list_iterm is struct<PartitionFieldSummary> | ||
| if (view_of_list_iterm->storage_type != ArrowType::NANOARROW_TYPE_STRUCT) { | ||
| return InvalidManifestList("partitions list field should be a list."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| return InvalidManifestList("partitions list field should be a list."); | |
| return InvalidManifestList("partitions list item should be a struct."); |
| return manifest_files; | ||
| } | ||
|
|
||
| Status ParseLiteral(ArrowArrayView* view_of_partition, int64_t row_idx, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we rename this to ParsePartitionValues, the current name is a little bit confusing to me.
| class ICEBERG_EXPORT ManifestReader { | ||
| public: | ||
| /// \brief Special value to select all columns from manifest files. | ||
| inline static const std::vector<std::string> kAllColumns{"*"}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this is a vector, a single string should be enough?
| first_row_id_(first_row_id) {} | ||
|
|
||
| ManifestReader& ManifestReaderImpl::Select(const std::vector<std::string>& columns) { | ||
| columns_ = columns; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think Java maps user-requested names to immutable Field IDs from the manifest's actual schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do something like this?
ManifestReader& ManifestReaderImpl::Select(const std::vector<std::string>& columns) {
// Validate columns exist in data file schema before storing
for (const auto& col_name : columns) {
if (col_name != kAllColumns) {
ICEBERG_CHECK(DataFile::GetSchemaByName(col_name, case_sensitive_).has_value(),
"Column '{}' not found in data file schema", col_name);
}
}
columns_ = columns;
return *this;
}
| } | ||
|
|
||
| ManifestReader& ManifestReaderImpl::FilterPartitions(std::shared_ptr<Expression> expr) { | ||
| part_filter_ = expr ? std::move(expr) : True::Instance(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The java implementation have 'AND': https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/ManifestReader.java
| } | ||
|
|
||
| ManifestReader& ManifestReaderImpl::FilterRows(std::shared_ptr<Expression> expr) { | ||
| row_filter_ = expr ? std::move(expr) : True::Instance(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is missing AND combination comparing with Java.
| if (!row_filter || row_filter->op() == Expression::Operation::kTrue) { | ||
| return false; | ||
| } | ||
| if (columns.empty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this function return true when stats columns are needed for filtering but NOT all selected. Currently it returns true when stats are NOT all selected, which is backwards. This will cause unnecessary stats projection when stats columns ARE already selected.
Something like below:
bool RequireStatsProjection(const std::shared_ptr<Expression>& row_filter,
const std::vector<std::string>& columns) {
if (!row_filter || row_filter->op() == Expression::Operation::kTrue) {
return false; // No row filter, no stats needed
}
if (columns.empty() || std::ranges::contains(columns, ManifestReader::kAllColumns)) {
return false; // All columns selected, stats already included
}
// Return true if ANY stats column is missing from selection
const std::unordered_set<std::string> selected(columns.cbegin(), columns.cend());
return !std::ranges::all_of(kStatsColumns, [&selected](const std::string& col) {
return selected.contains(col);
});
}
| } | ||
|
|
||
| Result<Evaluator*> ManifestReaderImpl::GetEvaluator() { | ||
| if (!evaluator_) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add null checks for row_filter_ and part_filter_ before us?
| class ICEBERG_EXPORT ManifestReader { | ||
| public: | ||
| /// \brief Special value to select all columns from manifest files. | ||
| inline static const std::string kAllColumns = "*"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider to use constexpr std::string_view for better performance?
Refactors the
ManifestReaderimplementation to support advanced data skippingand projection pushdown, and adds full test coverage ported from the Java implementation.
Implementation Changes:
ManifestReaderlogic: Move implementation frommanifest_reader_internal.ccto
manifest_reader.ccand remove the obsolete internal class.Select(),FilterPartitions(), andFilterRows()to supportcolumn projection and expression-based filtering (partition & metrics).
Test Coverage:
manifest_reader_test.cc: PortsTestManifestReaderfrom Java, covering:manifest_reader_stats_test.cc: PortsTestManifestReaderStats, verifying: