From ea2b1030b28be1e5c34c5e765e27f948db8b6df3 Mon Sep 17 00:00:00 2001 From: "chenweiguo.vc" Date: Thu, 22 May 2025 13:53:28 +0800 Subject: [PATCH 1/6] feat: Implement VectorAppender for RunEndEncodedVector --- .../arrow/vector/util/VectorAppender.java | 78 +++++++++++++++++++ .../arrow/vector/util/TestVectorAppender.java | 54 +++++++++++++ 2 files changed, 132 insertions(+) diff --git a/vector/src/main/java/org/apache/arrow/vector/util/VectorAppender.java b/vector/src/main/java/org/apache/arrow/vector/util/VectorAppender.java index 0dc96a4d4b..8fedca1c48 100644 --- a/vector/src/main/java/org/apache/arrow/vector/util/VectorAppender.java +++ b/vector/src/main/java/org/apache/arrow/vector/util/VectorAppender.java @@ -24,13 +24,18 @@ import org.apache.arrow.memory.util.MemoryUtil; import org.apache.arrow.util.Preconditions; import org.apache.arrow.vector.BaseFixedWidthVector; +import org.apache.arrow.vector.BaseIntVector; import org.apache.arrow.vector.BaseLargeVariableWidthVector; import org.apache.arrow.vector.BaseVariableWidthVector; import org.apache.arrow.vector.BaseVariableWidthViewVector; +import org.apache.arrow.vector.BigIntVector; import org.apache.arrow.vector.BitVector; import org.apache.arrow.vector.BitVectorHelper; import org.apache.arrow.vector.ExtensionTypeVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.IntVector; import org.apache.arrow.vector.NullVector; +import org.apache.arrow.vector.SmallIntVector; import org.apache.arrow.vector.ValueVector; import org.apache.arrow.vector.compare.TypeEqualsVisitor; import org.apache.arrow.vector.compare.VectorVisitor; @@ -39,6 +44,7 @@ import org.apache.arrow.vector.complex.LargeListVector; import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.NonNullableStructVector; +import org.apache.arrow.vector.complex.RunEndEncodedVector; import org.apache.arrow.vector.complex.UnionVector; /** Utility to append two vectors together. */ @@ -698,4 +704,76 @@ public ValueVector visit(ExtensionTypeVector deltaVector, Void value) { deltaVector.getUnderlyingVector().accept(underlyingAppender, null); return targetVector; } + + @Override + public ValueVector visit(RunEndEncodedVector deltaVector, Void value) { + Preconditions.checkArgument( + typeVisitor.equals(deltaVector), + "The vector to append must have the same type as the targetVector being appended"); + + if (deltaVector.getValueCount() == 0) { + return targetVector; // optimization, nothing to append, return + } + + RunEndEncodedVector targetEncodedVector = (RunEndEncodedVector) targetVector; + + final int targetLogicalValueCount = targetEncodedVector.getValueCount(); + + // Append the values vector first. + VectorAppender valueAppender = new VectorAppender(targetEncodedVector.getValuesVector()); + deltaVector.getValuesVector().accept(valueAppender, null); + + // Then append the run-ends vector. + BaseIntVector targetRunEndsVector = (BaseIntVector) targetEncodedVector.getRunEndsVector(); + BaseIntVector deltaRunEndsVector = (BaseIntVector) deltaVector.getRunEndsVector(); + + // Shift the delta run-ends vector in-place before appending. + shiftRunEndsVector( + deltaRunEndsVector, + deltaRunEndsVector.getDataBuffer(), + targetLogicalValueCount, + deltaRunEndsVector.getValueCount()); + + // Append the now-shifted delta run-ends vector to the target. + new VectorAppender(targetRunEndsVector).visit((BaseFixedWidthVector) deltaRunEndsVector, null); + + targetEncodedVector.setValueCount(targetLogicalValueCount + deltaVector.getValueCount()); + + return targetVector; + } + + private void shiftRunEndsVector( + ValueVector toRunEndVector, + ArrowBuf fromRunEndBuffer, + int offset, + int physicalLength) { + ArrowBuf toRunEndBuffer = toRunEndVector.getDataBuffer(); + if (toRunEndVector instanceof SmallIntVector) { + byte typeWidth = SmallIntVector.TYPE_WIDTH; + for (int i = 0; i < physicalLength; i++) { + toRunEndBuffer.setShort( + (long) i * typeWidth, + fromRunEndBuffer.getShort((long) (i) * typeWidth) + offset); + } + + } else if (toRunEndVector instanceof IntVector) { + byte typeWidth = IntVector.TYPE_WIDTH; + for (int i = 0; i < physicalLength; i++) { + toRunEndBuffer.setInt( + (long) i * typeWidth, + fromRunEndBuffer.getInt((long) (i) * typeWidth) + offset); + } + + } else if (toRunEndVector instanceof BigIntVector) { + byte typeWidth = BigIntVector.TYPE_WIDTH; + for (int i = 0; i < physicalLength; i++) { + toRunEndBuffer.setLong( + (long) i * typeWidth, + fromRunEndBuffer.getLong((long) (i) * typeWidth) + offset); + } + } else { + throw new IllegalArgumentException( + "Run-end vector and must be of type int with size 16, 32, or 64 bits."); + } + } } diff --git a/vector/src/test/java/org/apache/arrow/vector/util/TestVectorAppender.java b/vector/src/test/java/org/apache/arrow/vector/util/TestVectorAppender.java index 4ee9630a4d..62427162de 100644 --- a/vector/src/test/java/org/apache/arrow/vector/util/TestVectorAppender.java +++ b/vector/src/test/java/org/apache/arrow/vector/util/TestVectorAppender.java @@ -36,6 +36,7 @@ import org.apache.arrow.vector.Float4Vector; import org.apache.arrow.vector.IntVector; import org.apache.arrow.vector.LargeVarCharVector; +import org.apache.arrow.vector.complex.RunEndEncodedVector; import org.apache.arrow.vector.TestUtils; import org.apache.arrow.vector.ValueVector; import org.apache.arrow.vector.VarCharVector; @@ -1025,6 +1026,59 @@ public void testAppendDenseUnionVectorMismatch() { } } + @Test + public void testAppendRunEndEncodedVector() { + final FieldType reeFieldType = FieldType.notNullable(ArrowType.RunEndEncoded.INSTANCE); + final Field runEndsField = + new Field("runEnds", FieldType.notNullable(Types.MinorType.INT.getType()), null); + final Field valuesField = Field.nullable("values", Types.MinorType.INT.getType()); + final List children = Arrays.asList(runEndsField, valuesField); + + final Field targetField = new Field("target", reeFieldType, children); + final Field deltaField = new Field("delta", reeFieldType, children); + try (RunEndEncodedVector target = new RunEndEncodedVector(targetField, allocator, null); + RunEndEncodedVector delta = new RunEndEncodedVector(deltaField, allocator, null)) { + + // populate target + target.allocateNew(); + // data: [1, 1, 2, null, 3, 3, 3] (7 values) + // values: [1, 2, null, 3] + // runEnds: [2, 3, 4, 7] + ValueVectorDataPopulator.setVector((IntVector) target.getValuesVector(), 1, 2, null, 3); + ValueVectorDataPopulator.setVector((IntVector) target.getRunEndsVector(), 2, 3, 4, 7); + target.setValueCount(7); + + // populate delta + delta.allocateNew(); + // data: [3, 4, 4, 5, null, null] (6 values) + // values: [3, 4, 5, null] + // runEnds: [1, 3, 4, 6] + ValueVectorDataPopulator.setVector((IntVector) delta.getValuesVector(), 3, 4, 5, null); + ValueVectorDataPopulator.setVector((IntVector) delta.getRunEndsVector(), 1, 3, 4, 6); + delta.setValueCount(6); + + VectorAppender appender = new VectorAppender(target); + delta.accept(appender, null); + + assertEquals(13, target.getValueCount()); + + final Field expectedField = new Field("expected", reeFieldType, children); + try (RunEndEncodedVector expected = new RunEndEncodedVector(expectedField, allocator, null)) { + expected.allocateNew(); + // expected data: [1, 1, 2, null, 3, 3, 3, 3, 4, 4, 5, null, null] (13 values) + // expected values: [1, 2, null, 3, 3, 4, 5, null] + // expected runEnds: [2, 3, 4, 7, 8, 10, 11, 13] + ValueVectorDataPopulator.setVector( + (IntVector) expected.getValuesVector(), 1, 2, null, 3, 3, 4, 5, null); + ValueVectorDataPopulator.setVector( + (IntVector) expected.getRunEndsVector(), 2, 3, 4, 7, 8, 10, 11, 13); + expected.setValueCount(13); + + assertVectorsEqual(expected, target); + } + } + } + @Test public void testAppendVectorNegative() { final int vectorLength = 10; From 1de7c75a807a8b439a58fc42865ac8c9c35de998 Mon Sep 17 00:00:00 2001 From: "chenweiguo.vc" Date: Sat, 11 Oct 2025 00:23:22 +0800 Subject: [PATCH 2/6] format --- .../apache/arrow/vector/util/VectorAppender.java | 15 ++++----------- .../arrow/vector/util/TestVectorAppender.java | 2 +- 2 files changed, 5 insertions(+), 12 deletions(-) diff --git a/vector/src/main/java/org/apache/arrow/vector/util/VectorAppender.java b/vector/src/main/java/org/apache/arrow/vector/util/VectorAppender.java index 8fedca1c48..68bbaf0254 100644 --- a/vector/src/main/java/org/apache/arrow/vector/util/VectorAppender.java +++ b/vector/src/main/java/org/apache/arrow/vector/util/VectorAppender.java @@ -32,7 +32,6 @@ import org.apache.arrow.vector.BitVector; import org.apache.arrow.vector.BitVectorHelper; import org.apache.arrow.vector.ExtensionTypeVector; -import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.IntVector; import org.apache.arrow.vector.NullVector; import org.apache.arrow.vector.SmallIntVector; @@ -743,33 +742,27 @@ public ValueVector visit(RunEndEncodedVector deltaVector, Void value) { } private void shiftRunEndsVector( - ValueVector toRunEndVector, - ArrowBuf fromRunEndBuffer, - int offset, - int physicalLength) { + ValueVector toRunEndVector, ArrowBuf fromRunEndBuffer, int offset, int physicalLength) { ArrowBuf toRunEndBuffer = toRunEndVector.getDataBuffer(); if (toRunEndVector instanceof SmallIntVector) { byte typeWidth = SmallIntVector.TYPE_WIDTH; for (int i = 0; i < physicalLength; i++) { toRunEndBuffer.setShort( - (long) i * typeWidth, - fromRunEndBuffer.getShort((long) (i) * typeWidth) + offset); + (long) i * typeWidth, fromRunEndBuffer.getShort((long) (i) * typeWidth) + offset); } } else if (toRunEndVector instanceof IntVector) { byte typeWidth = IntVector.TYPE_WIDTH; for (int i = 0; i < physicalLength; i++) { toRunEndBuffer.setInt( - (long) i * typeWidth, - fromRunEndBuffer.getInt((long) (i) * typeWidth) + offset); + (long) i * typeWidth, fromRunEndBuffer.getInt((long) (i) * typeWidth) + offset); } } else if (toRunEndVector instanceof BigIntVector) { byte typeWidth = BigIntVector.TYPE_WIDTH; for (int i = 0; i < physicalLength; i++) { toRunEndBuffer.setLong( - (long) i * typeWidth, - fromRunEndBuffer.getLong((long) (i) * typeWidth) + offset); + (long) i * typeWidth, fromRunEndBuffer.getLong((long) (i) * typeWidth) + offset); } } else { throw new IllegalArgumentException( diff --git a/vector/src/test/java/org/apache/arrow/vector/util/TestVectorAppender.java b/vector/src/test/java/org/apache/arrow/vector/util/TestVectorAppender.java index 62427162de..3b457d6af5 100644 --- a/vector/src/test/java/org/apache/arrow/vector/util/TestVectorAppender.java +++ b/vector/src/test/java/org/apache/arrow/vector/util/TestVectorAppender.java @@ -36,7 +36,6 @@ import org.apache.arrow.vector.Float4Vector; import org.apache.arrow.vector.IntVector; import org.apache.arrow.vector.LargeVarCharVector; -import org.apache.arrow.vector.complex.RunEndEncodedVector; import org.apache.arrow.vector.TestUtils; import org.apache.arrow.vector.ValueVector; import org.apache.arrow.vector.VarCharVector; @@ -48,6 +47,7 @@ import org.apache.arrow.vector.complex.FixedSizeListVector; import org.apache.arrow.vector.complex.LargeListVector; import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.complex.RunEndEncodedVector; import org.apache.arrow.vector.complex.StructVector; import org.apache.arrow.vector.complex.UnionVector; import org.apache.arrow.vector.holders.NullableBigIntHolder; From 1c46594eeffb7d1aeb86770c30cff54f80df8e4b Mon Sep 17 00:00:00 2001 From: "chenweiguo.vc" Date: Wed, 29 Oct 2025 20:34:55 +0800 Subject: [PATCH 3/6] polish error message --- .../main/java/org/apache/arrow/vector/util/VectorAppender.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vector/src/main/java/org/apache/arrow/vector/util/VectorAppender.java b/vector/src/main/java/org/apache/arrow/vector/util/VectorAppender.java index 68bbaf0254..a6ad16ce82 100644 --- a/vector/src/main/java/org/apache/arrow/vector/util/VectorAppender.java +++ b/vector/src/main/java/org/apache/arrow/vector/util/VectorAppender.java @@ -708,7 +708,7 @@ public ValueVector visit(ExtensionTypeVector deltaVector, Void value) { public ValueVector visit(RunEndEncodedVector deltaVector, Void value) { Preconditions.checkArgument( typeVisitor.equals(deltaVector), - "The vector to append must have the same type as the targetVector being appended"); + "The deltaVector to append must have the same type as the targetVector"); if (deltaVector.getValueCount() == 0) { return targetVector; // optimization, nothing to append, return From ad9202d3ea464b26c8c5c640bd7583a069c847e9 Mon Sep 17 00:00:00 2001 From: "chenweiguo.vc" Date: Wed, 29 Oct 2025 22:24:35 +0800 Subject: [PATCH 4/6] shiftAndAppendRunEnds --- .../arrow/vector/util/VectorAppender.java | 56 ++++++++++++++----- 1 file changed, 42 insertions(+), 14 deletions(-) diff --git a/vector/src/main/java/org/apache/arrow/vector/util/VectorAppender.java b/vector/src/main/java/org/apache/arrow/vector/util/VectorAppender.java index a6ad16ce82..e7c0d11cb9 100644 --- a/vector/src/main/java/org/apache/arrow/vector/util/VectorAppender.java +++ b/vector/src/main/java/org/apache/arrow/vector/util/VectorAppender.java @@ -725,44 +725,72 @@ public ValueVector visit(RunEndEncodedVector deltaVector, Void value) { // Then append the run-ends vector. BaseIntVector targetRunEndsVector = (BaseIntVector) targetEncodedVector.getRunEndsVector(); BaseIntVector deltaRunEndsVector = (BaseIntVector) deltaVector.getRunEndsVector(); + appendRunEndsVector(targetRunEndsVector, deltaRunEndsVector, targetLogicalValueCount); - // Shift the delta run-ends vector in-place before appending. - shiftRunEndsVector( - deltaRunEndsVector, + targetEncodedVector.setValueCount(targetLogicalValueCount + deltaVector.getValueCount()); + return targetVector; + } + + private void appendRunEndsVector( + BaseIntVector targetRunEndsVector, + BaseIntVector deltaRunEndsVector, + int targetLogicalValueCount) { + int targetPhysicalValueCount = targetRunEndsVector.getValueCount(); + int newPhysicalValueCount = targetPhysicalValueCount + deltaRunEndsVector.getValueCount(); + + // make sure there is enough capacity + while (targetVector.getValueCapacity() < newPhysicalValueCount) { + targetVector.reAlloc(); + } + + // append validity buffer + BitVectorHelper.concatBits( + targetRunEndsVector.getValidityBuffer(), + targetRunEndsVector.getValueCount(), + deltaRunEndsVector.getValidityBuffer(), + deltaRunEndsVector.getValueCount(), + targetRunEndsVector.getValidityBuffer()); + + // shift and append data buffer + shiftAndAppendRunEndsDataBuffer( + targetRunEndsVector, + targetPhysicalValueCount, deltaRunEndsVector.getDataBuffer(), targetLogicalValueCount, deltaRunEndsVector.getValueCount()); - // Append the now-shifted delta run-ends vector to the target. - new VectorAppender(targetRunEndsVector).visit((BaseFixedWidthVector) deltaRunEndsVector, null); - - targetEncodedVector.setValueCount(targetLogicalValueCount + deltaVector.getValueCount()); - - return targetVector; + targetRunEndsVector.setValueCount(newPhysicalValueCount); } - private void shiftRunEndsVector( - ValueVector toRunEndVector, ArrowBuf fromRunEndBuffer, int offset, int physicalLength) { + private void shiftAndAppendRunEndsDataBuffer( + BaseIntVector toRunEndVector, + int toIndex, + ArrowBuf fromRunEndBuffer, + int offset, + int physicalLength) { ArrowBuf toRunEndBuffer = toRunEndVector.getDataBuffer(); if (toRunEndVector instanceof SmallIntVector) { byte typeWidth = SmallIntVector.TYPE_WIDTH; for (int i = 0; i < physicalLength; i++) { toRunEndBuffer.setShort( - (long) i * typeWidth, fromRunEndBuffer.getShort((long) (i) * typeWidth) + offset); + (long) (i + toIndex) * typeWidth, + fromRunEndBuffer.getShort((long) (i) * typeWidth) + offset); } } else if (toRunEndVector instanceof IntVector) { byte typeWidth = IntVector.TYPE_WIDTH; for (int i = 0; i < physicalLength; i++) { toRunEndBuffer.setInt( - (long) i * typeWidth, fromRunEndBuffer.getInt((long) (i) * typeWidth) + offset); + (long) (i + toIndex) * typeWidth, + fromRunEndBuffer.getInt((long) (i) * typeWidth) + offset); } } else if (toRunEndVector instanceof BigIntVector) { byte typeWidth = BigIntVector.TYPE_WIDTH; for (int i = 0; i < physicalLength; i++) { toRunEndBuffer.setLong( - (long) i * typeWidth, fromRunEndBuffer.getLong((long) (i) * typeWidth) + offset); + (long) (i + toIndex) * typeWidth, + fromRunEndBuffer.getLong((long) (i) * typeWidth) + offset); } } else { throw new IllegalArgumentException( From 0b26bb7adac1a7806b256c42dd8cbf554026a967 Mon Sep 17 00:00:00 2001 From: "chenweiguo.vc" Date: Fri, 31 Oct 2025 11:51:39 +0800 Subject: [PATCH 5/6] Check that delta is unchanged --- .../apache/arrow/vector/util/TestVectorAppender.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/vector/src/test/java/org/apache/arrow/vector/util/TestVectorAppender.java b/vector/src/test/java/org/apache/arrow/vector/util/TestVectorAppender.java index 3b457d6af5..3fa9d03c91 100644 --- a/vector/src/test/java/org/apache/arrow/vector/util/TestVectorAppender.java +++ b/vector/src/test/java/org/apache/arrow/vector/util/TestVectorAppender.java @@ -1076,6 +1076,17 @@ public void testAppendRunEndEncodedVector() { assertVectorsEqual(expected, target); } + + // Check that delta is unchanged. + final Field expectedDeltaField = new Field("expectedDelta", reeFieldType, children); + try (RunEndEncodedVector expectedDelta = + new RunEndEncodedVector(expectedDeltaField, allocator, null)) { + expectedDelta.allocateNew(); + ValueVectorDataPopulator.setVector((IntVector) expectedDelta.getValuesVector(), 3, 4, 5, null); + ValueVectorDataPopulator.setVector((IntVector) expectedDelta.getRunEndsVector(), 1, 3, 4, 6); + expectedDelta.setValueCount(6); + assertVectorsEqual(expectedDelta, delta); + } } } From 8a122a6d00a7202df2e2ce962e186b59a7a336d8 Mon Sep 17 00:00:00 2001 From: "chenweiguo.vc" Date: Fri, 31 Oct 2025 14:06:59 +0800 Subject: [PATCH 6/6] format --- .../org/apache/arrow/vector/util/TestVectorAppender.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/vector/src/test/java/org/apache/arrow/vector/util/TestVectorAppender.java b/vector/src/test/java/org/apache/arrow/vector/util/TestVectorAppender.java index 3fa9d03c91..df5521a1ad 100644 --- a/vector/src/test/java/org/apache/arrow/vector/util/TestVectorAppender.java +++ b/vector/src/test/java/org/apache/arrow/vector/util/TestVectorAppender.java @@ -1082,8 +1082,10 @@ public void testAppendRunEndEncodedVector() { try (RunEndEncodedVector expectedDelta = new RunEndEncodedVector(expectedDeltaField, allocator, null)) { expectedDelta.allocateNew(); - ValueVectorDataPopulator.setVector((IntVector) expectedDelta.getValuesVector(), 3, 4, 5, null); - ValueVectorDataPopulator.setVector((IntVector) expectedDelta.getRunEndsVector(), 1, 3, 4, 6); + ValueVectorDataPopulator.setVector( + (IntVector) expectedDelta.getValuesVector(), 3, 4, 5, null); + ValueVectorDataPopulator.setVector( + (IntVector) expectedDelta.getRunEndsVector(), 1, 3, 4, 6); expectedDelta.setValueCount(6); assertVectorsEqual(expectedDelta, delta); }