Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@
import org.apache.arrow.memory.util.MemoryUtil;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.BaseFixedWidthVector;
import org.apache.arrow.vector.BaseIntVector;
import org.apache.arrow.vector.BaseLargeVariableWidthVector;
import org.apache.arrow.vector.BaseVariableWidthVector;
import org.apache.arrow.vector.BaseVariableWidthViewVector;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.BitVectorHelper;
import org.apache.arrow.vector.ExtensionTypeVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.NullVector;
import org.apache.arrow.vector.SmallIntVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.compare.TypeEqualsVisitor;
import org.apache.arrow.vector.compare.VectorVisitor;
Expand All @@ -39,6 +43,7 @@
import org.apache.arrow.vector.complex.LargeListVector;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.NonNullableStructVector;
import org.apache.arrow.vector.complex.RunEndEncodedVector;
import org.apache.arrow.vector.complex.UnionVector;

/** Utility to append two vectors together. */
Expand Down Expand Up @@ -698,4 +703,98 @@ public ValueVector visit(ExtensionTypeVector<?> deltaVector, Void value) {
deltaVector.getUnderlyingVector().accept(underlyingAppender, null);
return targetVector;
}

@Override
public ValueVector visit(RunEndEncodedVector deltaVector, Void value) {
Preconditions.checkArgument(
typeVisitor.equals(deltaVector),
"The deltaVector to append must have the same type as the targetVector");

if (deltaVector.getValueCount() == 0) {
return targetVector; // optimization, nothing to append, return
}

RunEndEncodedVector targetEncodedVector = (RunEndEncodedVector) targetVector;

final int targetLogicalValueCount = targetEncodedVector.getValueCount();

// Append the values vector first.
VectorAppender valueAppender = new VectorAppender(targetEncodedVector.getValuesVector());
deltaVector.getValuesVector().accept(valueAppender, null);

// Then append the run-ends vector.
BaseIntVector targetRunEndsVector = (BaseIntVector) targetEncodedVector.getRunEndsVector();
BaseIntVector deltaRunEndsVector = (BaseIntVector) deltaVector.getRunEndsVector();
appendRunEndsVector(targetRunEndsVector, deltaRunEndsVector, targetLogicalValueCount);

targetEncodedVector.setValueCount(targetLogicalValueCount + deltaVector.getValueCount());
return targetVector;
}

private void appendRunEndsVector(
BaseIntVector targetRunEndsVector,
BaseIntVector deltaRunEndsVector,
int targetLogicalValueCount) {
int targetPhysicalValueCount = targetRunEndsVector.getValueCount();
int newPhysicalValueCount = targetPhysicalValueCount + deltaRunEndsVector.getValueCount();

// make sure there is enough capacity
while (targetVector.getValueCapacity() < newPhysicalValueCount) {
targetVector.reAlloc();
}

// append validity buffer
BitVectorHelper.concatBits(
targetRunEndsVector.getValidityBuffer(),
targetRunEndsVector.getValueCount(),
deltaRunEndsVector.getValidityBuffer(),
deltaRunEndsVector.getValueCount(),
targetRunEndsVector.getValidityBuffer());

// shift and append data buffer
shiftAndAppendRunEndsDataBuffer(
targetRunEndsVector,
targetPhysicalValueCount,
deltaRunEndsVector.getDataBuffer(),
targetLogicalValueCount,
deltaRunEndsVector.getValueCount());

targetRunEndsVector.setValueCount(newPhysicalValueCount);
}

private void shiftAndAppendRunEndsDataBuffer(
BaseIntVector toRunEndVector,
int toIndex,
ArrowBuf fromRunEndBuffer,
int offset,
int physicalLength) {
ArrowBuf toRunEndBuffer = toRunEndVector.getDataBuffer();
if (toRunEndVector instanceof SmallIntVector) {
byte typeWidth = SmallIntVector.TYPE_WIDTH;
for (int i = 0; i < physicalLength; i++) {
toRunEndBuffer.setShort(
(long) (i + toIndex) * typeWidth,
fromRunEndBuffer.getShort((long) (i) * typeWidth) + offset);
}

} else if (toRunEndVector instanceof IntVector) {
byte typeWidth = IntVector.TYPE_WIDTH;
for (int i = 0; i < physicalLength; i++) {
toRunEndBuffer.setInt(
(long) (i + toIndex) * typeWidth,
fromRunEndBuffer.getInt((long) (i) * typeWidth) + offset);
}

} else if (toRunEndVector instanceof BigIntVector) {
byte typeWidth = BigIntVector.TYPE_WIDTH;
for (int i = 0; i < physicalLength; i++) {
toRunEndBuffer.setLong(
(long) (i + toIndex) * typeWidth,
fromRunEndBuffer.getLong((long) (i) * typeWidth) + offset);
}
} else {
throw new IllegalArgumentException(
"Run-end vector and must be of type int with size 16, 32, or 64 bits.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.arrow.vector.complex.FixedSizeListVector;
import org.apache.arrow.vector.complex.LargeListVector;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.RunEndEncodedVector;
import org.apache.arrow.vector.complex.StructVector;
import org.apache.arrow.vector.complex.UnionVector;
import org.apache.arrow.vector.holders.NullableBigIntHolder;
Expand Down Expand Up @@ -1025,6 +1026,59 @@ public void testAppendDenseUnionVectorMismatch() {
}
}

@Test
public void testAppendRunEndEncodedVector() {
final FieldType reeFieldType = FieldType.notNullable(ArrowType.RunEndEncoded.INSTANCE);
final Field runEndsField =
new Field("runEnds", FieldType.notNullable(Types.MinorType.INT.getType()), null);
final Field valuesField = Field.nullable("values", Types.MinorType.INT.getType());
final List<Field> children = Arrays.asList(runEndsField, valuesField);

final Field targetField = new Field("target", reeFieldType, children);
final Field deltaField = new Field("delta", reeFieldType, children);
try (RunEndEncodedVector target = new RunEndEncodedVector(targetField, allocator, null);
RunEndEncodedVector delta = new RunEndEncodedVector(deltaField, allocator, null)) {

// populate target
target.allocateNew();
// data: [1, 1, 2, null, 3, 3, 3] (7 values)
// values: [1, 2, null, 3]
// runEnds: [2, 3, 4, 7]
ValueVectorDataPopulator.setVector((IntVector) target.getValuesVector(), 1, 2, null, 3);
ValueVectorDataPopulator.setVector((IntVector) target.getRunEndsVector(), 2, 3, 4, 7);
target.setValueCount(7);

// populate delta
delta.allocateNew();
// data: [3, 4, 4, 5, null, null] (6 values)
// values: [3, 4, 5, null]
// runEnds: [1, 3, 4, 6]
ValueVectorDataPopulator.setVector((IntVector) delta.getValuesVector(), 3, 4, 5, null);
ValueVectorDataPopulator.setVector((IntVector) delta.getRunEndsVector(), 1, 3, 4, 6);
delta.setValueCount(6);

VectorAppender appender = new VectorAppender(target);
delta.accept(appender, null);

assertEquals(13, target.getValueCount());

final Field expectedField = new Field("expected", reeFieldType, children);
try (RunEndEncodedVector expected = new RunEndEncodedVector(expectedField, allocator, null)) {
expected.allocateNew();
// expected data: [1, 1, 2, null, 3, 3, 3, 3, 4, 4, 5, null, null] (13 values)
// expected values: [1, 2, null, 3, 3, 4, 5, null]
// expected runEnds: [2, 3, 4, 7, 8, 10, 11, 13]
ValueVectorDataPopulator.setVector(
(IntVector) expected.getValuesVector(), 1, 2, null, 3, 3, 4, 5, null);
ValueVectorDataPopulator.setVector(
(IntVector) expected.getRunEndsVector(), 2, 3, 4, 7, 8, 10, 11, 13);
expected.setValueCount(13);

assertVectorsEqual(expected, target);
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also just check that delta vector was not modified?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, let me do that.

}

@Test
public void testAppendVectorNegative() {
final int vectorLength = 10;
Expand Down