From c01bccfdd2e5df8b372a923256acee5b4c83cbbf Mon Sep 17 00:00:00 2001 From: shaojun Date: Wed, 19 Nov 2025 23:28:48 +0800 Subject: [PATCH 1/2] =?UTF-8?q?fix=20:=20=E4=BF=AE=E5=A4=8Dtrino=20?= =?UTF-8?q?=E5=86=99=E5=85=A5=20paimon=20=E5=88=86=E5=8C=BA=E8=A1=A8=20?= =?UTF-8?q?=E6=8A=A5=EF=BC=9A=20Index=201=20out=20of=20bounds=20for=20leng?= =?UTF-8?q?th=201=20=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../FixedBucketTableShuffleFunction.java | 65 +++++++++++++++---- 1 file changed, 54 insertions(+), 11 deletions(-) diff --git a/src/main/java/org/apache/paimon/trino/FixedBucketTableShuffleFunction.java b/src/main/java/org/apache/paimon/trino/FixedBucketTableShuffleFunction.java index 839ad0c..9632faf 100644 --- a/src/main/java/org/apache/paimon/trino/FixedBucketTableShuffleFunction.java +++ b/src/main/java/org/apache/paimon/trino/FixedBucketTableShuffleFunction.java @@ -44,18 +44,25 @@ public class FixedBucketTableShuffleFunction implements BucketFunction { private final int bucketCount; private final boolean isRowId; private final ThreadLocal projectionContext; + private final TableSchema schema; + private final List partitionKeys; // 🔧 新增:保存 partition keys public FixedBucketTableShuffleFunction( List partitionChannelTypes, TrinoPartitioningHandle partitioningHandle, int workerCount) { - TableSchema schema = partitioningHandle.getOriginalSchema(); + this.schema = partitioningHandle.getOriginalSchema(); + this.partitionKeys = schema.partitionKeys(); // 🔧 获取 partition keys + + // 🔧 关键修改:使用 partition keys 而不是 primary keys this.projectionContext = ThreadLocal.withInitial( () -> CodeGenUtils.newProjection( - schema.logicalPrimaryKeysType(), schema.primaryKeys())); + schema.logicalPartitionType(), // ✅ 使用 partition type + partitionKeys)); // ✅ 使用 partition keys + this.bucketCount = new CoreOptions(schema.options()).bucket(); this.workerCount = workerCount; this.isRowId = @@ -65,23 +72,59 @@ public FixedBucketTableShuffleFunction( @Override public int getBucket(Page page, int position) { + Page processedPage = page; + + // 处理 RowBlock 的情况 if (isRowId) { RowBlock rowBlock = (RowBlock) page.getBlock(0); try { Method method = RowBlock.class.getDeclaredMethod("getRawFieldBlocks"); method.setAccessible(true); - page = new Page(rowBlock.getPositionCount(), (Block[]) method.invoke(rowBlock)); - } catch (NoSuchMethodException e) { - throw new RuntimeException(e); - } catch (InvocationTargetException e) { - throw new RuntimeException(e); - } catch (IllegalAccessException e) { - throw new RuntimeException(e); + Block[] rawBlocks = (Block[]) method.invoke(rowBlock); + processedPage = new Page(rowBlock.getPositionCount(), rawBlocks); + } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) { + throw new RuntimeException("Failed to extract raw field blocks from RowBlock", e); } } - TrinoRow trinoRow = new TrinoRow(page.getSingleValuePage(position), RowKind.INSERT); - BinaryRow pk = projectionContext.get().apply(trinoRow); + // 🔧 修改验证逻辑:验证 partition keys 数量而不是所有字段 + int expectedBlockCount = partitionKeys.size(); // ✅ 期望 partition keys 数量 + int actualBlockCount = processedPage.getChannelCount(); + + if (actualBlockCount != expectedBlockCount) { + throw new IllegalStateException( + String.format( + "Page block count mismatch: expected %d (partition keys), but got %d. " + + "Partition keys: %s, Schema fields: %s, Primary keys: %s", + expectedBlockCount, + actualBlockCount, + partitionKeys, // ✅ 显示 partition keys + schema.fieldNames(), + schema.primaryKeys())); + } + + // 使用 processedPage 创建 TrinoRow + TrinoRow trinoRow = + new TrinoRow(processedPage.getSingleValuePage(position), RowKind.INSERT); + + // 🔧 修改错误信息:显示 partition keys 相关信息 + BinaryRow pk; + try { + pk = projectionContext.get().apply(trinoRow); + } catch (IndexOutOfBoundsException e) { + throw new RuntimeException( + String.format( + "Failed to extract partition keys from row. " + + "Row field count: %d, Partition keys: %s, " // ✅ 改为 partition + // keys + + "Page block count: %d, Position: %d", + trinoRow.getFieldCount(), + partitionKeys, // ✅ 显示 partition keys + processedPage.getChannelCount(), + position), + e); + } + int bucket = KeyAndBucketExtractor.bucket( KeyAndBucketExtractor.bucketKeyHashCode(pk), bucketCount); From bea8743a7f3f1ceaa4d0e1a37e92854a07f3e1e6 Mon Sep 17 00:00:00 2001 From: shaojun Date: Thu, 20 Nov 2025 10:18:25 +0800 Subject: [PATCH 2/2] =?UTF-8?q?fix=20:=20=E4=BF=AE=E5=A4=8Dtrino=20?= =?UTF-8?q?=E5=86=99=E5=85=A5=20paimon=20=E5=88=86=E5=8C=BA=E8=A1=A8=20?= =?UTF-8?q?=E6=8A=A5=EF=BC=9A=20Index=201=20out=20of=20bounds=20for=20leng?= =?UTF-8?q?th=201=20=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../FixedBucketTableShuffleFunction.java | 51 +++++++++++-------- 1 file changed, 30 insertions(+), 21 deletions(-) diff --git a/src/main/java/org/apache/paimon/trino/FixedBucketTableShuffleFunction.java b/src/main/java/org/apache/paimon/trino/FixedBucketTableShuffleFunction.java index 9632faf..20856bc 100644 --- a/src/main/java/org/apache/paimon/trino/FixedBucketTableShuffleFunction.java +++ b/src/main/java/org/apache/paimon/trino/FixedBucketTableShuffleFunction.java @@ -45,7 +45,7 @@ public class FixedBucketTableShuffleFunction implements BucketFunction { private final boolean isRowId; private final ThreadLocal projectionContext; private final TableSchema schema; - private final List partitionKeys; // 🔧 新增:保存 partition keys + private final List bucketKeys; // 🔧 改为通用的 bucketKeys public FixedBucketTableShuffleFunction( List partitionChannelTypes, @@ -53,15 +53,24 @@ public FixedBucketTableShuffleFunction( int workerCount) { this.schema = partitioningHandle.getOriginalSchema(); - this.partitionKeys = schema.partitionKeys(); // 🔧 获取 partition keys - // 🔧 关键修改:使用 partition keys 而不是 primary keys - this.projectionContext = - ThreadLocal.withInitial( - () -> - CodeGenUtils.newProjection( - schema.logicalPartitionType(), // ✅ 使用 partition type - partitionKeys)); // ✅ 使用 partition keys + // 🔧 关键修改:根据是否分区表选择不同的 keys + List partitionKeys = schema.partitionKeys(); + if (!partitionKeys.isEmpty()) { + // 分区表:使用 partition keys + this.bucketKeys = partitionKeys; + this.projectionContext = + ThreadLocal.withInitial( + () -> + CodeGenUtils.newProjection( + schema.logicalPartitionType(), bucketKeys)); + } else { + // 非分区表:使用 primary keys + this.bucketKeys = schema.primaryKeys(); + this.projectionContext = + ThreadLocal.withInitial( + () -> CodeGenUtils.newProjection(schema.logicalRowType(), bucketKeys)); + } this.bucketCount = new CoreOptions(schema.options()).bucket(); this.workerCount = workerCount; @@ -87,39 +96,39 @@ public int getBucket(Page page, int position) { } } - // 🔧 修改验证逻辑:验证 partition keys 数量而不是所有字段 - int expectedBlockCount = partitionKeys.size(); // ✅ 期望 partition keys 数量 + // 🔧 修改验证逻辑:验证 bucketKeys 数量 + int expectedBlockCount = bucketKeys.size(); int actualBlockCount = processedPage.getChannelCount(); if (actualBlockCount != expectedBlockCount) { throw new IllegalStateException( String.format( - "Page block count mismatch: expected %d (partition keys), but got %d. " - + "Partition keys: %s, Schema fields: %s, Primary keys: %s", + "Page block count mismatch: expected %d (bucket keys), but got %d. " + + "Bucket keys: %s, Partition keys: %s, Primary keys: %s, Schema fields: %s", expectedBlockCount, actualBlockCount, - partitionKeys, // ✅ 显示 partition keys - schema.fieldNames(), - schema.primaryKeys())); + bucketKeys, + schema.partitionKeys(), + schema.primaryKeys(), + schema.fieldNames())); } // 使用 processedPage 创建 TrinoRow TrinoRow trinoRow = new TrinoRow(processedPage.getSingleValuePage(position), RowKind.INSERT); - // 🔧 修改错误信息:显示 partition keys 相关信息 + // 🔧 修改错误信息:显示 bucketKeys 相关信息 BinaryRow pk; try { pk = projectionContext.get().apply(trinoRow); } catch (IndexOutOfBoundsException e) { throw new RuntimeException( String.format( - "Failed to extract partition keys from row. " - + "Row field count: %d, Partition keys: %s, " // ✅ 改为 partition - // keys + "Failed to extract bucket keys from row. " + + "Row field count: %d, Bucket keys: %s, " + "Page block count: %d, Position: %d", trinoRow.getFieldCount(), - partitionKeys, // ✅ 显示 partition keys + bucketKeys, processedPage.getChannelCount(), position), e);