diff --git a/vector/src/main/java/org/apache/arrow/vector/util/VectorAppender.java b/vector/src/main/java/org/apache/arrow/vector/util/VectorAppender.java index 0dc96a4d4b..e7c0d11cb9 100644 --- a/vector/src/main/java/org/apache/arrow/vector/util/VectorAppender.java +++ b/vector/src/main/java/org/apache/arrow/vector/util/VectorAppender.java @@ -24,13 +24,17 @@ import org.apache.arrow.memory.util.MemoryUtil; import org.apache.arrow.util.Preconditions; import org.apache.arrow.vector.BaseFixedWidthVector; +import org.apache.arrow.vector.BaseIntVector; import org.apache.arrow.vector.BaseLargeVariableWidthVector; import org.apache.arrow.vector.BaseVariableWidthVector; import org.apache.arrow.vector.BaseVariableWidthViewVector; +import org.apache.arrow.vector.BigIntVector; import org.apache.arrow.vector.BitVector; import org.apache.arrow.vector.BitVectorHelper; import org.apache.arrow.vector.ExtensionTypeVector; +import org.apache.arrow.vector.IntVector; import org.apache.arrow.vector.NullVector; +import org.apache.arrow.vector.SmallIntVector; import org.apache.arrow.vector.ValueVector; import org.apache.arrow.vector.compare.TypeEqualsVisitor; import org.apache.arrow.vector.compare.VectorVisitor; @@ -39,6 +43,7 @@ import org.apache.arrow.vector.complex.LargeListVector; import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.NonNullableStructVector; +import org.apache.arrow.vector.complex.RunEndEncodedVector; import org.apache.arrow.vector.complex.UnionVector; /** Utility to append two vectors together. */ @@ -698,4 +703,98 @@ public ValueVector visit(ExtensionTypeVector deltaVector, Void value) { deltaVector.getUnderlyingVector().accept(underlyingAppender, null); return targetVector; } + + @Override + public ValueVector visit(RunEndEncodedVector deltaVector, Void value) { + Preconditions.checkArgument( + typeVisitor.equals(deltaVector), + "The deltaVector to append must have the same type as the targetVector"); + + if (deltaVector.getValueCount() == 0) { + return targetVector; // optimization, nothing to append, return + } + + RunEndEncodedVector targetEncodedVector = (RunEndEncodedVector) targetVector; + + final int targetLogicalValueCount = targetEncodedVector.getValueCount(); + + // Append the values vector first. + VectorAppender valueAppender = new VectorAppender(targetEncodedVector.getValuesVector()); + deltaVector.getValuesVector().accept(valueAppender, null); + + // Then append the run-ends vector. + BaseIntVector targetRunEndsVector = (BaseIntVector) targetEncodedVector.getRunEndsVector(); + BaseIntVector deltaRunEndsVector = (BaseIntVector) deltaVector.getRunEndsVector(); + appendRunEndsVector(targetRunEndsVector, deltaRunEndsVector, targetLogicalValueCount); + + targetEncodedVector.setValueCount(targetLogicalValueCount + deltaVector.getValueCount()); + return targetVector; + } + + private void appendRunEndsVector( + BaseIntVector targetRunEndsVector, + BaseIntVector deltaRunEndsVector, + int targetLogicalValueCount) { + int targetPhysicalValueCount = targetRunEndsVector.getValueCount(); + int newPhysicalValueCount = targetPhysicalValueCount + deltaRunEndsVector.getValueCount(); + + // make sure there is enough capacity + while (targetVector.getValueCapacity() < newPhysicalValueCount) { + targetVector.reAlloc(); + } + + // append validity buffer + BitVectorHelper.concatBits( + targetRunEndsVector.getValidityBuffer(), + targetRunEndsVector.getValueCount(), + deltaRunEndsVector.getValidityBuffer(), + deltaRunEndsVector.getValueCount(), + targetRunEndsVector.getValidityBuffer()); + + // shift and append data buffer + shiftAndAppendRunEndsDataBuffer( + targetRunEndsVector, + targetPhysicalValueCount, + deltaRunEndsVector.getDataBuffer(), + targetLogicalValueCount, + deltaRunEndsVector.getValueCount()); + + targetRunEndsVector.setValueCount(newPhysicalValueCount); + } + + private void shiftAndAppendRunEndsDataBuffer( + BaseIntVector toRunEndVector, + int toIndex, + ArrowBuf fromRunEndBuffer, + int offset, + int physicalLength) { + ArrowBuf toRunEndBuffer = toRunEndVector.getDataBuffer(); + if (toRunEndVector instanceof SmallIntVector) { + byte typeWidth = SmallIntVector.TYPE_WIDTH; + for (int i = 0; i < physicalLength; i++) { + toRunEndBuffer.setShort( + (long) (i + toIndex) * typeWidth, + fromRunEndBuffer.getShort((long) (i) * typeWidth) + offset); + } + + } else if (toRunEndVector instanceof IntVector) { + byte typeWidth = IntVector.TYPE_WIDTH; + for (int i = 0; i < physicalLength; i++) { + toRunEndBuffer.setInt( + (long) (i + toIndex) * typeWidth, + fromRunEndBuffer.getInt((long) (i) * typeWidth) + offset); + } + + } else if (toRunEndVector instanceof BigIntVector) { + byte typeWidth = BigIntVector.TYPE_WIDTH; + for (int i = 0; i < physicalLength; i++) { + toRunEndBuffer.setLong( + (long) (i + toIndex) * typeWidth, + fromRunEndBuffer.getLong((long) (i) * typeWidth) + offset); + } + } else { + throw new IllegalArgumentException( + "Run-end vector and must be of type int with size 16, 32, or 64 bits."); + } + } } diff --git a/vector/src/test/java/org/apache/arrow/vector/util/TestVectorAppender.java b/vector/src/test/java/org/apache/arrow/vector/util/TestVectorAppender.java index 4ee9630a4d..df5521a1ad 100644 --- a/vector/src/test/java/org/apache/arrow/vector/util/TestVectorAppender.java +++ b/vector/src/test/java/org/apache/arrow/vector/util/TestVectorAppender.java @@ -47,6 +47,7 @@ import org.apache.arrow.vector.complex.FixedSizeListVector; import org.apache.arrow.vector.complex.LargeListVector; import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.complex.RunEndEncodedVector; import org.apache.arrow.vector.complex.StructVector; import org.apache.arrow.vector.complex.UnionVector; import org.apache.arrow.vector.holders.NullableBigIntHolder; @@ -1025,6 +1026,72 @@ public void testAppendDenseUnionVectorMismatch() { } } + @Test + public void testAppendRunEndEncodedVector() { + final FieldType reeFieldType = FieldType.notNullable(ArrowType.RunEndEncoded.INSTANCE); + final Field runEndsField = + new Field("runEnds", FieldType.notNullable(Types.MinorType.INT.getType()), null); + final Field valuesField = Field.nullable("values", Types.MinorType.INT.getType()); + final List children = Arrays.asList(runEndsField, valuesField); + + final Field targetField = new Field("target", reeFieldType, children); + final Field deltaField = new Field("delta", reeFieldType, children); + try (RunEndEncodedVector target = new RunEndEncodedVector(targetField, allocator, null); + RunEndEncodedVector delta = new RunEndEncodedVector(deltaField, allocator, null)) { + + // populate target + target.allocateNew(); + // data: [1, 1, 2, null, 3, 3, 3] (7 values) + // values: [1, 2, null, 3] + // runEnds: [2, 3, 4, 7] + ValueVectorDataPopulator.setVector((IntVector) target.getValuesVector(), 1, 2, null, 3); + ValueVectorDataPopulator.setVector((IntVector) target.getRunEndsVector(), 2, 3, 4, 7); + target.setValueCount(7); + + // populate delta + delta.allocateNew(); + // data: [3, 4, 4, 5, null, null] (6 values) + // values: [3, 4, 5, null] + // runEnds: [1, 3, 4, 6] + ValueVectorDataPopulator.setVector((IntVector) delta.getValuesVector(), 3, 4, 5, null); + ValueVectorDataPopulator.setVector((IntVector) delta.getRunEndsVector(), 1, 3, 4, 6); + delta.setValueCount(6); + + VectorAppender appender = new VectorAppender(target); + delta.accept(appender, null); + + assertEquals(13, target.getValueCount()); + + final Field expectedField = new Field("expected", reeFieldType, children); + try (RunEndEncodedVector expected = new RunEndEncodedVector(expectedField, allocator, null)) { + expected.allocateNew(); + // expected data: [1, 1, 2, null, 3, 3, 3, 3, 4, 4, 5, null, null] (13 values) + // expected values: [1, 2, null, 3, 3, 4, 5, null] + // expected runEnds: [2, 3, 4, 7, 8, 10, 11, 13] + ValueVectorDataPopulator.setVector( + (IntVector) expected.getValuesVector(), 1, 2, null, 3, 3, 4, 5, null); + ValueVectorDataPopulator.setVector( + (IntVector) expected.getRunEndsVector(), 2, 3, 4, 7, 8, 10, 11, 13); + expected.setValueCount(13); + + assertVectorsEqual(expected, target); + } + + // Check that delta is unchanged. + final Field expectedDeltaField = new Field("expectedDelta", reeFieldType, children); + try (RunEndEncodedVector expectedDelta = + new RunEndEncodedVector(expectedDeltaField, allocator, null)) { + expectedDelta.allocateNew(); + ValueVectorDataPopulator.setVector( + (IntVector) expectedDelta.getValuesVector(), 3, 4, 5, null); + ValueVectorDataPopulator.setVector( + (IntVector) expectedDelta.getRunEndsVector(), 1, 3, 4, 6); + expectedDelta.setValueCount(6); + assertVectorsEqual(expectedDelta, delta); + } + } + } + @Test public void testAppendVectorNegative() { final int vectorLength = 10;