Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ private static <ID extends Entity.Id<?>> ReadTableParams<ID> defaultReadTablePar
}

private static <ID extends Entity.Id<?>> ReadTableParams.ReadTableParamsBuilder<ID> buildReadTableParamsNonLegacy() {
return ReadTableParams.<ID>builder().useNewSpliterator(true);
return ReadTableParams.<ID>builder().spliteratorType(ReadTableParams.SpliteratorType.EXPERIMENTAL);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.core.grpc.GrpcReadStream;
import tech.ydb.proto.ValueProtos;
import tech.ydb.table.Session;
import tech.ydb.table.query.DataQueryResult;
Expand All @@ -25,6 +26,7 @@
import tech.ydb.table.query.stats.QueryStats;
import tech.ydb.table.query.stats.QueryStatsCollectionMode;
import tech.ydb.table.query.stats.TableAccessStats;
import tech.ydb.table.query.ReadTablePart;
import tech.ydb.table.result.ResultSetReader;
import tech.ydb.table.settings.BulkUpsertSettings;
import tech.ydb.table.settings.CommitTxSettings;
Expand Down Expand Up @@ -70,6 +72,14 @@
import tech.ydb.yoj.repository.ydb.exception.YdbRepositoryException;
import tech.ydb.yoj.repository.ydb.merge.QueriesMerger;
import tech.ydb.yoj.repository.ydb.readtable.ReadTableMapper;
import tech.ydb.yoj.repository.ydb.spliterator.ClosableSpliterator;
import tech.ydb.yoj.repository.ydb.spliterator.ResultSetIterator;
import tech.ydb.yoj.repository.ydb.spliterator.YdbSpliterator;
import tech.ydb.yoj.repository.ydb.spliterator.queue.YojQueue;
import tech.ydb.yoj.repository.ydb.spliterator.YdbSpliteratorQueueGrpcStreamAdapter;
import tech.ydb.yoj.repository.ydb.spliterator.legacy.YdbLegacySpliterator;
import tech.ydb.yoj.repository.ydb.spliterator.legacy.YdbNewLegacySpliterator;
import tech.ydb.yoj.repository.ydb.spliterator.queue.YojQueueImpl;
import tech.ydb.yoj.repository.ydb.statement.Statement;
import tech.ydb.yoj.repository.ydb.table.YdbTable;
import tech.ydb.yoj.util.lang.Interrupts;
Expand All @@ -78,6 +88,7 @@
import javax.annotation.Nullable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand All @@ -101,7 +112,7 @@ public class YdbRepositoryTransaction<REPO extends YdbRepository>
private static final String PROP_TRACE_VERBOSE_OBJ_RESULTS = "tech.ydb.yoj.repository.ydb.trace.verboseObjResults";

private final List<YdbRepository.Query<?>> pendingWrites = new ArrayList<>();
private final List<YdbSpliterator<?>> spliterators = new ArrayList<>();
private final List<ClosableSpliterator<?>> spliterators = new ArrayList<>();

@Getter
private final TxOptions options;
Expand All @@ -127,8 +138,8 @@ public YdbRepositoryTransaction(REPO repo, TxOptions options) {
this.tablespace = repo.getSchemaOperations().getTablespace();
}

private <V> YdbSpliterator<V> createSpliterator(String request, boolean isOrdered) {
YdbSpliterator<V> spliterator = new YdbSpliterator<>(request, isOrdered);
private <V> YdbNewLegacySpliterator<V> createSpliterator(String request, boolean isOrdered) {
YdbNewLegacySpliterator<V> spliterator = new YdbNewLegacySpliterator<>(request, isOrdered);
spliterators.add(spliterator);
return spliterator;
}
Expand Down Expand Up @@ -183,7 +194,7 @@ private void doCommit() {

private void closeStreams() {
Exception summaryException = null;
for (YdbSpliterator<?> spliterator : spliterators) {
for (ClosableSpliterator<?> spliterator : spliterators) {
try {
spliterator.close();
} catch (Exception e) {
Expand Down Expand Up @@ -451,7 +462,7 @@ public <PARAMS, RESULT> Stream<RESULT> executeScanQuery(Statement<PARAMS, RESULT
String yql = getYql(statement);
Params sdkParams = getSdkParams(statement, params);

YdbSpliterator<RESULT> spliterator = createSpliterator("scanQuery: " + yql, false);
YdbNewLegacySpliterator<RESULT> spliterator = createSpliterator("scanQuery: " + yql, false);

initSession();
session.executeScanQuery(
Expand Down Expand Up @@ -559,38 +570,65 @@ public <PARAMS, RESULT> Stream<RESULT> readTable(ReadTableMapper<PARAMS, RESULT>
settings.toKey(TupleValue.of(values), params.isToInclusive());
}

if (params.isUseNewSpliterator()) {
YdbSpliterator<RESULT> spliterator = createSpliterator("readTable: " + tableName, params.isOrdered());
return switch (params.getSpliteratorType()) {
case LEGACY -> {
try {
YdbLegacySpliterator<RESULT> spliterator = new YdbLegacySpliterator<>(params.isOrdered(), action ->
doCall("read table " + mapper.getTableName(""), () -> {
Status status = YdbOperations.safeJoin(
session.readTable(
tableName,
settings.build(),
rs -> new ResultSetConverter(rs).stream(mapper::mapResult).forEach(action)
),
params.getTimeout().plusMinutes(5)
);
validate("readTable", status.getCode(), status.toString());
})
);
yield spliterator.makeStream();
} catch (RepositoryException e) {
throw e;
} catch (Exception e) {
throw new UnexpectedException("Could not read table " + tableName, e);
}
}
case LEGACY_SLOW -> {
YdbNewLegacySpliterator<RESULT> spliterator = createSpliterator("readTable: " + tableName, params.isOrdered());

initSession();
session.readTable(
tableName, settings.build(),
resultSet -> new ResultSetConverter(resultSet).stream(mapper::mapResult).forEach(spliterator::onNext)
).whenComplete(spliterator::onSupplierThreadComplete);
initSession();
session.readTable(
tableName, settings.build(),
resultSet -> new ResultSetConverter(resultSet).stream(mapper::mapResult).forEach(spliterator::onNext)
).whenComplete(spliterator::onSupplierThreadComplete);

return spliterator.createStream();
}
yield spliterator.createStream();
}
case EXPERIMENTAL -> {
initSession();

// TODO: configure stream timeout
// TODO: configure batch count
YojQueue<Iterator<RESULT>> queue = YojQueueImpl.create(0, Duration.ofMinutes(5));

var adapter = new YdbSpliteratorQueueGrpcStreamAdapter<>("readTable: " + tableName, queue);
GrpcReadStream<ReadTablePart> grpcStream = session.executeReadTable(tableName, settings.build());
CompletableFuture<Status> future = grpcStream.start(readTablePart -> {
ResultSetIterator<RESULT> iterator = new ResultSetIterator<>(
readTablePart.getResultSetReader(),
mapper::mapResult
);
adapter.onNext(iterator);
});
future.whenComplete(adapter::onSupplierThreadComplete);

try {
YdbLegacySpliterator<RESULT> spliterator = new YdbLegacySpliterator<>(params.isOrdered(), action ->
doCall("read table " + mapper.getTableName(""), () -> {
Status status = YdbOperations.safeJoin(
session.readTable(
tableName,
settings.build(),
rs -> new ResultSetConverter(rs).stream(mapper::mapResult).forEach(action)
),
params.getTimeout().plusMinutes(5)
);
validate("readTable", status.getCode(), status.toString());
})
);
return spliterator.makeStream();
} catch (RepositoryException e) {
throw e;
} catch (Exception e) {
throw new UnexpectedException("Could not read table " + tableName, e);
}
YdbSpliterator<RESULT> spliterator = new YdbSpliterator<>(queue, params.isOrdered());

spliterators.add(spliterator);

yield spliterator.createStream();
}
};
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package tech.ydb.yoj.repository.ydb.spliterator;

import java.util.Spliterator;

public interface ClosableSpliterator<V> extends Spliterator<V> {
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package tech.ydb.yoj.repository.ydb.spliterator;

import tech.ydb.proto.ValueProtos;
import tech.ydb.table.result.ResultSetReader;
import tech.ydb.yoj.repository.ydb.client.YdbConverter;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;

public final class ResultSetIterator<V> implements Iterator<V> {
private final ResultSetReader resultSet;
private final ResultConverter<V> converter;
private final List<ValueProtos.Column> columns;

private int position = 0;

public ResultSetIterator(ResultSetReader resultSet, ResultConverter<V> converter) {
List<ValueProtos.Column> columns;
if (resultSet.getRowCount() > 0) {
resultSet.setRowIndex(0);
columns = getColumns(resultSet);
} else {
columns = new ArrayList<>();
}

this.resultSet = resultSet;
this.converter = converter;
this.columns = columns;
}

@Override
public boolean hasNext() {
return position < resultSet.getRowCount();
}

@Override
public V next() {
if (!hasNext()) {
throw new NoSuchElementException();
}

ValueProtos.Value value = buildValue(position++);

return converter.convert(columns, value);
}

private ValueProtos.Value buildValue(int rowIndex) {
resultSet.setRowIndex(rowIndex);
ValueProtos.Value.Builder value = ValueProtos.Value.newBuilder();
for (int i = 0; i < columns.size(); i++) {
value.addItems(YdbConverter.convertValueToProto(resultSet.getColumn(i)));
}
return value.build();
}

private static List<ValueProtos.Column> getColumns(ResultSetReader resultSet) {
List<ValueProtos.Column> columns = new ArrayList<>();
for (int i = 0; i < resultSet.getColumnCount(); i++) {
columns.add(ValueProtos.Column.newBuilder()
.setName(resultSet.getColumnName(i))
.build()
);
}
return columns;
}

@FunctionalInterface
public interface ResultConverter<V> {
V convert(List<ValueProtos.Column> columns, ValueProtos.Value value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package tech.ydb.yoj.repository.ydb.spliterator;

import tech.ydb.yoj.repository.ydb.spliterator.queue.YojQueue;
import tech.ydb.yoj.repository.ydb.spliterator.queue.YojSpliteratorQueue;

import java.util.Collections;
import java.util.Iterator;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public final class YdbSpliterator<V> implements ClosableSpliterator<V> {
private final YojSpliteratorQueue<Iterator<V>> queue;
private final int flags;

private Iterator<V> valueIterator = Collections.emptyIterator();

private boolean closed = false;

public YdbSpliterator(YojQueue<Iterator<V>> queue, boolean isOrdered) {
this.queue = queue;
this.flags = (isOrdered ? ORDERED : 0) | NONNULL;
}

// Correct way to create stream with YdbSpliterator. onClose call is important for avoid supplier thread leak.
public Stream<V> createStream() {
return StreamSupport.stream(this, false).onClose(this::close);
}

@Override
public void close() {
if (closed) {
return;
}
closed = true;
queue.close();
}

@Override
public boolean tryAdvance(Consumer<? super V> action) {
if (closed) {
return false;
}

// queue could return empty iterator, we have to select one with elements
while (!valueIterator.hasNext()) {
valueIterator = queue.poll();
if (valueIterator == null) {
close();
return false;
}
}

V value = valueIterator.next();

action.accept(value);

return true;
}

@Override
public Spliterator<V> trySplit() {
return null;
}

@Override
public long estimateSize() {
return Long.MAX_VALUE;
}

@Override
public long getExactSizeIfKnown() {
return -1;
}

@Override
public int characteristics() {
return flags;
}
}
Loading
Loading