Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 31 additions & 11 deletions src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,19 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.fileindex.FileIndexPredicate;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.table.source.IndexFile;
import org.apache.paimon.table.source.RawFile;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.system.ReadOptimizedTable;
import org.apache.paimon.trino.catalog.TrinoCatalog;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
Expand Down Expand Up @@ -64,6 +67,7 @@
import org.joda.time.DateTimeZone;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
Expand Down Expand Up @@ -177,14 +181,33 @@ private ConnectorPageSource createPageSource(
Split paimonSplit = split.decodeSplit();
Optional<List<RawFile>> optionalRawFiles = paimonSplit.convertToRawFiles();
if (checkRawFile(optionalRawFiles)) {
FileStoreTable fileStoreTable = (FileStoreTable) table;
boolean readIndex = fileStoreTable.coreOptions().fileIndexReadEnabled();
FileIO fileIO;
Path location;
CoreOptions coreOptions;
TableSchema schema;
if (table instanceof ReadOptimizedTable readOptimizedTable) {
fileIO = readOptimizedTable.fileIO();
location = readOptimizedTable.location();
coreOptions = readOptimizedTable.coreOptions();
Field wrappedField = ReadOptimizedTable.class.getDeclaredField("wrapped");
wrappedField.setAccessible(true);
FileStoreTable wrappedValue =
(FileStoreTable) wrappedField.get(readOptimizedTable);
schema = wrappedValue.schema();
} else if (table instanceof FileStoreTable fileStoreTable) {
fileIO = fileStoreTable.fileIO();
location = fileStoreTable.location();
coreOptions = fileStoreTable.coreOptions();
schema = fileStoreTable.schema();
} else {
throw new RuntimeException("Unknown table type: " + table.getClass().getName());
}
boolean readIndex = coreOptions.fileIndexReadEnabled();

Optional<List<DeletionFile>> deletionFiles = paimonSplit.deletionFiles();
Optional<List<IndexFile>> indexFiles =
readIndex ? paimonSplit.indexFiles() : Optional.empty();
SchemaManager schemaManager =
new SchemaManager(fileStoreTable.fileIO(), fileStoreTable.location());
SchemaManager schemaManager = new SchemaManager(fileIO, location);
List<Type> type =
columns.stream()
.map(s -> ((TrinoColumnHandle) s).getTrinoType())
Expand All @@ -202,9 +225,7 @@ private ConnectorPageSource createPageSource(
if (indexFile != null && paimonFilter.isPresent()) {
try (FileIndexPredicate fileIndexPredicate =
new FileIndexPredicate(
new Path(indexFile.path()),
((FileStoreTable) table).fileIO(),
rowType)) {
new Path(indexFile.path()), fileIO, rowType)) {
if (!fileIndexPredicate.evaluate(paimonFilter.get()).remain()) {
continue;
}
Expand All @@ -215,13 +236,13 @@ private ConnectorPageSource createPageSource(
createDataPageSource(
rawFile.format(),
fileSystem.newInputFile(Location.of(rawFile.path())),
fileStoreTable.coreOptions(),
coreOptions,
// map table column name to data column
// name, if column does not exist in
// data columns, set it to null
// columns those set to null will generate
// a null vector in orc page
fileStoreTable.schema().id() == rawFile.schemaId()
schema.id() == rawFile.schemaId()
? projectedFields
: schemaEvolutionFieldNames(
projectedFields,
Expand All @@ -241,8 +262,7 @@ private ConnectorPageSource createPageSource(
deletionFile -> {
try {
return DeletionVector.read(
fileStoreTable.fileIO(),
deletionFile);
fileIO, deletionFile);
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down