diff --git a/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java b/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java index 171dfd8..16c442c 100644 --- a/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java +++ b/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java @@ -21,9 +21,11 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.fileindex.FileIndexPredicate; +import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.source.DeletionFile; @@ -31,6 +33,7 @@ import org.apache.paimon.table.source.RawFile; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.source.Split; +import org.apache.paimon.table.system.ReadOptimizedTable; import org.apache.paimon.trino.catalog.TrinoCatalog; import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowType; @@ -64,6 +67,7 @@ import org.joda.time.DateTimeZone; import java.io.IOException; +import java.lang.reflect.Field; import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedList; @@ -177,14 +181,33 @@ private ConnectorPageSource createPageSource( Split paimonSplit = split.decodeSplit(); Optional> optionalRawFiles = paimonSplit.convertToRawFiles(); if (checkRawFile(optionalRawFiles)) { - FileStoreTable fileStoreTable = (FileStoreTable) table; - boolean readIndex = fileStoreTable.coreOptions().fileIndexReadEnabled(); + FileIO fileIO; + Path location; + CoreOptions coreOptions; + TableSchema schema; + if (table instanceof ReadOptimizedTable readOptimizedTable) { + fileIO = readOptimizedTable.fileIO(); + location = readOptimizedTable.location(); + coreOptions = readOptimizedTable.coreOptions(); + Field wrappedField = ReadOptimizedTable.class.getDeclaredField("wrapped"); + wrappedField.setAccessible(true); + FileStoreTable wrappedValue = + (FileStoreTable) wrappedField.get(readOptimizedTable); + schema = wrappedValue.schema(); + } else if (table instanceof FileStoreTable fileStoreTable) { + fileIO = fileStoreTable.fileIO(); + location = fileStoreTable.location(); + coreOptions = fileStoreTable.coreOptions(); + schema = fileStoreTable.schema(); + } else { + throw new RuntimeException("Unknown table type: " + table.getClass().getName()); + } + boolean readIndex = coreOptions.fileIndexReadEnabled(); Optional> deletionFiles = paimonSplit.deletionFiles(); Optional> indexFiles = readIndex ? paimonSplit.indexFiles() : Optional.empty(); - SchemaManager schemaManager = - new SchemaManager(fileStoreTable.fileIO(), fileStoreTable.location()); + SchemaManager schemaManager = new SchemaManager(fileIO, location); List type = columns.stream() .map(s -> ((TrinoColumnHandle) s).getTrinoType()) @@ -202,9 +225,7 @@ private ConnectorPageSource createPageSource( if (indexFile != null && paimonFilter.isPresent()) { try (FileIndexPredicate fileIndexPredicate = new FileIndexPredicate( - new Path(indexFile.path()), - ((FileStoreTable) table).fileIO(), - rowType)) { + new Path(indexFile.path()), fileIO, rowType)) { if (!fileIndexPredicate.evaluate(paimonFilter.get()).remain()) { continue; } @@ -215,13 +236,13 @@ private ConnectorPageSource createPageSource( createDataPageSource( rawFile.format(), fileSystem.newInputFile(Location.of(rawFile.path())), - fileStoreTable.coreOptions(), + coreOptions, // map table column name to data column // name, if column does not exist in // data columns, set it to null // columns those set to null will generate // a null vector in orc page - fileStoreTable.schema().id() == rawFile.schemaId() + schema.id() == rawFile.schemaId() ? projectedFields : schemaEvolutionFieldNames( projectedFields, @@ -241,8 +262,7 @@ private ConnectorPageSource createPageSource( deletionFile -> { try { return DeletionVector.read( - fileStoreTable.fileIO(), - deletionFile); + fileIO, deletionFile); } catch (IOException e) { throw new RuntimeException(e); }