-
Notifications
You must be signed in to change notification settings - Fork 202
Add native Apache Iceberg table support with CoralCatalog abstraction #556
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
sumedhsakdeo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @aastha25 , code looks great. Added some questions / comments, ptal.
| // Iceberg timestamp type - microsecond precision (6 digits) | ||
| convertedType = typeFactory.createSqlType(SqlTypeName.TIMESTAMP, 6); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we handle timestamp with time zone for completeness?
| // Iceberg timestamp type - microsecond precision (6 digits) | |
| convertedType = typeFactory.createSqlType(SqlTypeName.TIMESTAMP, 6); | |
| Types.TimestampType timestampType = (Types.TimestampType) icebergType; | |
| if (timestampType.shouldAdjustToUTC()) { | |
| // TIMESTAMP WITH TIME ZONE - stores instant in time | |
| convertedType = typeFactory.createSqlType(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, 6); | |
| } else { | |
| // TIMESTAMP - stores local datetime | |
| convertedType = typeFactory.createSqlType(SqlTypeName.TIMESTAMP, 6); | |
| } |
| convertedType = typeFactory.createSqlType(SqlTypeName.DATE); | ||
| break; | ||
| case TIME: | ||
| convertedType = typeFactory.createSqlType(SqlTypeName.TIME); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| convertedType = typeFactory.createSqlType(SqlTypeName.TIME); | |
| convertedType = typeFactory.createSqlType(SqlTypeName.TIME, 6); |
| convertedType = typeFactory.createSqlType(SqlTypeName.BINARY, fixedType.length()); | ||
| break; | ||
| case BINARY: | ||
| convertedType = typeFactory.createSqlType(SqlTypeName.VARBINARY, Integer.MAX_VALUE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any particular reason why we use VARBINARY over BINARY here?
Unlike HiveTypeConverter
| convertedType = dtFactory.createSqlType(SqlTypeName.BINARY); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you're right, im changing this to be same as hive.
coral-common/src/main/java/com/linkedin/coral/common/IcebergTypeConverter.java
Show resolved
Hide resolved
|
|
||
| @Override | ||
| public Schema.TableType getJdbcTableType() { | ||
| return dataset.tableType() == TableType.VIEW ? Schema.TableType.VIEW : Schema.TableType.TABLE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| return dataset.tableType() == TableType.VIEW ? Schema.TableType.VIEW : Schema.TableType.TABLE; | |
| return Schema.TableType.TABLE; |
🤔, with an assert that dataset.tableType() should be TableType.MANAGED_TABLE?
| /** | ||
| * Returns the underlying Iceberg Table for advanced operations. | ||
| * | ||
| * @return Iceberg Table object |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| * @return Iceberg Table object | |
| * @return org.apache.iceberg.Table |
| * Utility class to convert Iceberg datasets to Hive Table objects for backward compatibility. | ||
| * | ||
| * <p>This converter creates complete Hive Table objects from Iceberg tables, including schema conversion | ||
| * using {@code HiveSchemaUtil}. While the table object acts as "glue code" for backward compatibility, | ||
| * it populates all standard Hive table metadata to ensure broad compatibility with downstream code paths. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we expect to exercise this glue code in practice? If so, under what scenarios?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code path is used to read table properties when parsing the view SQL in @parsetreeBuilder.
Yes, this glue code gets exercised primarily to retrieve eligible table properties on the base tables during parsing
stage in ParseTreeBuilder/HiveFunctionResolver (no schema dependency). Without this glue, we would need larger scale refactoring in those classes to interpret IcebergTable natively.
|
|
||
| // Convert Iceberg schema to Hive columns | ||
| try { | ||
| storageDescriptor.setCols(HiveSchemaUtil.convert(icebergTable.schema())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any particular reason why we choose to set storageDescriptor columns from HiveSchemaUtil.convert(icebergTable.schema()) and not from AvroSchemaUtil.convert(hiveParameters.get("avro.schema.literal"))?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(a) iceberg schema is the SOT (b) avro literal may not always exist or could be stale & (c) this logic is ported from existing production code paths so as to have consistency in how we convert iceberg table -> hive table object across the stack.
Practically, setting this one way or the other in this specific class has no bearing on view schema resolution.
| 0, // createTime | ||
| 0, // lastModifiedTime | ||
| 0, // retention |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are the side-effects of empty metadata here for owner .. retention?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
none, we practically only need tbl properties for backward compatibility with SQL parser logic in ParseTreeBuilder
| * @param dbName Database or namespace name | ||
| * @return true if the namespace exists, false otherwise | ||
| */ | ||
| boolean namespaceExists(String dbName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let us fix the inconsistencies between namespace, db, schema.
|
Let us not use |
| * across different table formats (Hive, Iceberg, etc.). | ||
| * | ||
| * CoralCatalog abstracts away the differences between various table formats | ||
| * and provides a consistent way to access dataset information through |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto on "Dataset" terminology.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the feedback, I have refactored the PR to move away from Dataset.
we now have
(1) coralCatalog (new catalog interface) & HiveMetastoreClient (old catalog interface) are independent and both work with Coral for translations. HiveMetastoreClient has been marked as deprecated in favor of coralCatalog.
(2) getTable() is the API in coralCatalog. It returns an interface of CoralTable. Currently, we have 2 impls of coralTable - hiveCoralTable & icebergCoralTable
coral-common/src/main/java/com/linkedin/coral/common/catalog/DatasetConverter.java
Outdated
Show resolved
Hide resolved
| */ | ||
| @Override | ||
| public TableType tableType() { | ||
| return TableType.fromHiveTableType(table.getTableType()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you write the spec of conversion between Hive, Iceberg, and Coral representations? How does this expand for more table formats? Ideally we should have a Coral representation that is universal enough and everything can be converted to it. So I would expect methods like toCoralType as opposed to fromHiveType. Underlying table formats should not be hard coded in the universal catalog as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TableType.fromHiveTableType this method has been deleted.
Also, as discussed, the spec of table formats -> coral IR is just schema conversion which is captured in classes TypeConverter for hive tables & IcebergTypeConverter for iceberg tables.
| * @param icebergCoralTable Iceberg coral table to convert | ||
| * @return Hive Table object with complete metadata and schema | ||
| */ | ||
| public static Table toHiveTable(IcebergCoralTable icebergCoralTable) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on offline discussion, I understood we got rid of those, but seems we still leverage methods that hardcode the table formats. This is not good for extensibility.
|
Can we eliminate if (coralCatalog != null) {
...
} else {
...
}that we are currently using everywhere and use some adapter class instead? |
|
There are quite a fiew overlapping wrappers: The layering is conceptually unclear. Can we simplify this and merge a few classes? |
| * @param namespaceName Namespace (database) name | ||
| * @return true if the namespace exists, false otherwise | ||
| */ | ||
| boolean namespaceExists(String namespaceName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inconsistency between namespace, and schema elsewhere.
|
I have considered a few design options and this seems to make the most sense: interface CoralTable extends ScannableTable
class HiveTable implements CoralTable
class IcebergTable implements CoralTable |
| assertEquals(timestampType.getPrecision(), 6, "Timestamp should have precision 6 (microseconds)"); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found here that the max precision of TIMESTAMP in Calcite is 3. I also wrote a similar test in my local and it failed for me. Curious how it worked here. Could you clarify which particular RelDataTypeFactory is being picked up in this test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. I checked out the PR and found it was HiveTypeSystem which has max precision of 9.
Discussed offline. The motivation of the above was to avoid duplicating implementation layers (i.e., having both To properly decouple the two, we would need a standalone Coral type system that models schema and type metadata independently. That type system has now been introduced in #558, which can serve as the foundation for adopting an approach that makes |
| * @param tableName Table name | ||
| * @return CoralTable object representing the table, or null if not found | ||
| */ | ||
| CoralTable getTable(String namespaceName, String tableName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
namespaceName -> namespace.
| * This interface provides a common way to access table metadata regardless | ||
| * of the underlying table format (Hive, Iceberg, etc.). | ||
| * | ||
| * This abstraction is used by Calcite integration layer to dispatch to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let us not mention implementation details (e.g., Calcite) explicitly in Coral common module. Can you check the rest of the PR?
| * | ||
| * @return Fully qualified table name | ||
| */ | ||
| String name(); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the method name need to reflect it is a fully qualified table name? Also, is this method used anywhere? Let us add things only when necessary.
| * | ||
| * Used by Calcite integration to dispatch to HiveTable. | ||
| */ | ||
| public class HiveCoralTable implements CoralTable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are table implementations in common and table implementations in catalog. What is the basis for defining in each?
| * | ||
| * Used by Calcite integration to dispatch to HiveTable. | ||
| */ | ||
| public class HiveCoralTable implements CoralTable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be implemented as a plugin to avoid class path issues? Also note that both Hive and Iceberg co-exist in the same module which is not quite normal.
| final CoralDataType coralType = HiveToCoralTypeConverter.convert(typeInfo); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Example of package inconsistency. Utility method for this implementation is in the common package.
| public org.apache.iceberg.Table getIcebergTable() { | ||
| return table; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally you should not need this, and it might indicate a leak in the API. Can we avoid it?
|
|
||
| compile deps.'hadoop'.'hadoop-common' | ||
|
|
||
| // LinkedIn Iceberg dependencies |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another reason why this needs to be a plugin. This should integrate with OSS Iceberg too.
| // Convert Iceberg coral table to minimal Hive Table for backward compatibility | ||
| // This is needed because downstream code (ParseTreeBuilder, HiveFunctionResolver) | ||
| // expects a Hive Table object for Dali UDF resolution |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you elaborate more? Why not use CoralTable there?
| * <li>Storage descriptor with SerDe info (for compatibility)</li> | ||
| * </ul> | ||
| */ | ||
| public class IcebergHiveTableConverter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The point of this change is not to do this anymore.
| topSqlNode.accept(new RegisterDynamicFunctionsForTypeDerivation()); | ||
|
|
||
| TypeDerivationUtil typeDerivationUtil = new TypeDerivationUtil(toRelConverter.getSqlValidator(), topSqlNode); | ||
| operatorTransformerList = SqlCallTransformers.of(new FromUtcTimestampOperatorTransformer(typeDerivationUtil), | ||
| new GenericProjectTransformer(typeDerivationUtil), new NamedStructToCastTransformer(typeDerivationUtil), | ||
| new ConcatOperatorTransformer(typeDerivationUtil), new SubstrOperatorTransformer(typeDerivationUtil), | ||
| new CastOperatorTransformer(typeDerivationUtil), new UnionSqlCallTransformer(typeDerivationUtil)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactor the code to avoid repitition?
| */ | ||
| @Override | ||
| public RelDataType getRowType(RelDataTypeFactory typeFactory) { | ||
| // Stage 1: Iceberg → Coral |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Step.
| * | ||
| * Copied structure from TypeConverter for consistency. | ||
| */ | ||
| public class IcebergTypeConverter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not see why this is required. Could you explain?
| * @param msc Hive metastore client for Hive-specific access (can be null if coralCatalog is provided) | ||
| * @param dbName Database name (must not be null) | ||
| */ | ||
| HiveDbSchema(CoralCatalog coralCatalog, HiveMetastoreClient msc, @Nonnull String dbName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we introduce the coral classes here, depending only on CoralCatalog, and mark the Hive ones as deprecated?
What changes are proposed in this pull request, and why are they necessary?
Summary
This PR introduces native Apache Iceberg table support to Coral, enabling direct schema conversion from Iceberg to Calcite's RelDataType without lossy intermediate conversions through Hive's type system. The implementation preserves Iceberg-specific type semantics including timestamp precision and explicit nullability.
Key architectural decision:
HiveMetastoreClientremains unchanged anddoes NOT extend
CoralCatalog. Integration classes use composition(storing both instances) with runtime dispatch.
New Components
coral-common/src/main/java/com/linkedin/coral/common/catalog/)CoralCatalog: Format-agnostic catalog interface withgetTable(),getAllTables(),namespaceExists()CoralTable: Unified table metadata interface (name(),properties(),tableType())HiveCoralTable/IcebergCoralTable: Implementations wrapping native Hive/Iceberg table objectsIceberg Integration
IcebergTable: Calcite ScannableTable implementation for Iceberg tablesIcebergTypeConverter: Converts Iceberg Schema → Calcite RelDataType with precision preservationIcebergHiveTableConverter: Backward compatibility bridge for UDF resolution (converts Iceberg → Hive table object)Integration Pattern
HiveSchema,HiveDbSchema,ToRelConverter: Store bothCoralCatalogandHiveMetastoreClientinstancescoralCatalog != nulluse unified path;else if
msc != nulluse Hive-only pathHiveMetastoreClientandHiveMscAdaptermarked @deprecated (still functional, prefer CoralCatalog)How Reviewers Should Read This
Start here:
CoralCatalog.java- New abstraction layer interfaceCoralTable.java- Unified table metadata interfaceIcebergCoralTable.java- How Iceberg tables are wrappedIcebergTypeConverter.java- Core schema conversion logicThen review integration:
HiveDbSchema.java- Dispatch logic based onCoralTabletype (Iceberg vs Hive)IcebergTable.java- Calcite integrationToRelConverter.java- Dual-path support (CoralCatalog vs HiveMetastoreClient)HiveMetastoreClient.java- Backward compatibilityTest:
IcebergTableConverterTest.java- End-to-end Iceberg conversion testHow was this patch tested?
New and existing tests pass
integration tests - WIP