Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions core/src/main/java/tech/ydb/core/utils/UpdatableOptional.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package tech.ydb.core.utils;

import java.util.Optional;

/**
*
* @author Aleksandr Gorshenin
* @param <R> type of value
*/
public class UpdatableOptional<R> {
private volatile R value = null;

public void update(R value) {
this.value = value;
}

public R orElse(R other) {
return Optional.ofNullable(value).orElse(other);
}

public R get() {
return value;
}
}
34 changes: 13 additions & 21 deletions query/src/main/java/tech/ydb/query/impl/SessionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import tech.ydb.core.operation.StatusExtractor;
import tech.ydb.core.settings.BaseRequestSettings;
import tech.ydb.core.utils.URITools;
import tech.ydb.core.utils.UpdatableOptional;
import tech.ydb.proto.query.YdbQuery;
import tech.ydb.query.QuerySession;
import tech.ydb.query.QueryStream;
Expand Down Expand Up @@ -287,9 +288,9 @@ void handleCompletion(Status status, Throwable th) { }

@Override
public CompletableFuture<Result<QueryInfo>> execute(PartsHandler handler) {
final CompletableFuture<Result<QueryInfo>> result = new CompletableFuture<>();
final AtomicReference<QueryStats> stats = new AtomicReference<>();
grpcStream.start(msg -> {
final UpdatableOptional<Status> operationStatus = new UpdatableOptional<>();
final UpdatableOptional<QueryStats> stats = new UpdatableOptional<>();
return grpcStream.start(msg -> {
if (isTraceEnabled) {
logger.trace("{} got stream message {}", SessionImpl.this, TextFormat.shortDebugString(msg));
}
Expand All @@ -300,7 +301,7 @@ public CompletableFuture<Result<QueryInfo>> execute(PartsHandler handler) {

if (!status.isSuccess()) {
handleTxMeta(null);
result.complete(Result.fail(status));
operationStatus.update(status);
return;
}

Expand All @@ -315,10 +316,7 @@ public CompletableFuture<Result<QueryInfo>> execute(PartsHandler handler) {
}
}
if (msg.hasExecStats()) {
QueryStats old = stats.getAndSet(new QueryStats(msg.getExecStats()));
if (old != null) {
logger.warn("{} lost previous exec stats {}", SessionImpl.this, old);
}
stats.update(new QueryStats(msg.getExecStats()));
}

if (msg.hasResultSet()) {
Expand All @@ -329,21 +327,15 @@ public CompletableFuture<Result<QueryInfo>> execute(PartsHandler handler) {
logger.trace("{} lost result set part with index {}", SessionImpl.this, index);
}
}
}).whenComplete((status, th) -> {
handleCompletion(status, th);
if (th != null) {
result.completeExceptionally(th);
}
if (status != null) {
updateSessionState(status);
if (status.isSuccess()) {
result.complete(Result.success(new QueryInfo(stats.get()), status));
} else {
result.complete(Result.fail(status));
}
}).whenComplete(this::handleCompletion).thenApply(streamStatus -> {
updateSessionState(streamStatus);
Status status = operationStatus.orElse(streamStatus);
if (status.isSuccess()) {
return Result.success(new QueryInfo(stats.get()), streamStatus);
} else {
return Result.fail(status);
}
});
return result;
}

@Override
Expand Down
19 changes: 10 additions & 9 deletions query/src/test/java/tech/ydb/query/impl/GrpcTestInterceptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
* @author Aleksandr Gorshenin
*/
public class GrpcTestInterceptor implements Consumer<ManagedChannelBuilder<?>>, ClientInterceptor {
private final Queue<Status> nextStatus = new ConcurrentLinkedQueue<>();
private volatile Queue<Status> overrideQueue = new ConcurrentLinkedQueue<>();

public void reset() {
nextStatus.clear();
overrideQueue = new ConcurrentLinkedQueue<>();
}

public void addNextStatus(Status status) {
nextStatus.add(status);
public void addOverrideStatus(Status status) {
overrideQueue.add(status);
}

@Override
Expand All @@ -39,15 +39,17 @@ public void accept(ManagedChannelBuilder<?> t) {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return new ProxyClientCall<>(next, method, callOptions);
return new ProxyClientCall<>(next, overrideQueue.poll(), method, callOptions);
}

private class ProxyClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {
private static class ProxyClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {
private final ClientCall<ReqT, RespT> delegate;
private final Status overriden;

private ProxyClientCall(Channel channel, MethodDescriptor<ReqT, RespT> method,
private ProxyClientCall(Channel channel, Status overriden, MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions) {
this.delegate = channel.newCall(method, callOptions);
this.overriden = overriden;
}

@Override
Expand Down Expand Up @@ -110,8 +112,7 @@ public void onMessage(RespT message) {

@Override
public void onClose(Status status, Metadata trailers) {
Status next = nextStatus.poll();
delegate.onClose(next != null ? next : status, trailers);
delegate.onClose(overriden != null ? overriden : status, trailers);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void sessionExecuteQueryTest() {
QuerySession s1 = getSession();
String id1 = s1.getId();

grpcInterceptor.addNextStatus(io.grpc.Status.UNAVAILABLE);
grpcInterceptor.addOverrideStatus(io.grpc.Status.UNAVAILABLE);

Result<QueryInfo> res = s1.createQuery("SELECT 1 + 2", TxMode.NONE).execute().join();
Assert.assertEquals(StatusCode.TRANSPORT_UNAVAILABLE, res.getStatus().getCode());
Expand Down
30 changes: 18 additions & 12 deletions table/src/main/java/tech/ydb/table/impl/BaseSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import tech.ydb.core.settings.BaseRequestSettings;
import tech.ydb.core.utils.ProtobufUtils;
import tech.ydb.core.utils.URITools;
import tech.ydb.core.utils.UpdatableOptional;
import tech.ydb.proto.StatusCodesProtos.StatusIds;
import tech.ydb.proto.ValueProtos;
import tech.ydb.proto.ValueProtos.TypedValue;
Expand Down Expand Up @@ -1387,42 +1388,47 @@ public String toString() {

private abstract class ProxyStream<R, T> implements GrpcReadStream<T> {
private final GrpcReadStream<R> origin;
private final CompletableFuture<Status> result = new CompletableFuture<>();
private final CompletableFuture<Status> result;
private final UpdatableOptional<Status> operationStatus = new UpdatableOptional<>();
private final UpdatableOptional<Throwable> operationError = new UpdatableOptional<>();

ProxyStream(GrpcReadStream<R> origin) {
this.origin = origin;
this.result = new CompletableFuture<>();
}

abstract StatusIds.StatusCode readStatusCode(R message);
abstract List<YdbIssueMessage.IssueMessage> readIssues(R message);
abstract T readValue(R message);

private void onClose(Status status, Throwable th) {
private void onClose(Status streamStatus, Throwable streamError) {
Throwable th = operationError.orElse(streamError);
if (th != null) {
updateSessionState(th, null, false);
result.completeExceptionally(th);
return;
}
if (status != null) {
updateSessionState(null, status.getCode(), false);
result.complete(status);

Status st = operationStatus.orElse(streamStatus);
if (st != null) {
updateSessionState(null, st.getCode(), false);
result.complete(st);
}
}

@Override
public CompletableFuture<Status> start(Observer<T> observer) {
origin.start(message -> {
StatusIds.StatusCode statusCode = readStatusCode(message);
if (statusCode == StatusIds.StatusCode.SUCCESS) {
StatusIds.StatusCode code = readStatusCode(message);
if (code == StatusIds.StatusCode.SUCCESS) {
try {
observer.onNext(readValue(message));
} catch (Throwable t) {
result.completeExceptionally(t);
} catch (Throwable th) {
operationError.update(th);
origin.cancel();
}
} else {
Issue[] issues = Issue.fromPb(readIssues(message));
StatusCode code = StatusCode.fromProto(statusCode);
result.complete(Status.of(code, issues));
operationStatus.update(Status.of(StatusCode.fromProto(code), Issue.fromPb(readIssues(message))));
origin.cancel();
}
}).whenComplete(this::onClose);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
* @author Aleksandr Gorshenin
*/
public class GrpcTestInterceptor implements Consumer<ManagedChannelBuilder<?>>, ClientInterceptor {
private final Queue<Status> nextStatus = new ConcurrentLinkedQueue<>();
private volatile Queue<Status> overrideQueue = new ConcurrentLinkedQueue<>();

public void reset() {
nextStatus.clear();
overrideQueue = new ConcurrentLinkedQueue<>();
}

public void addNextStatus(Status status) {
nextStatus.add(status);
public void addOverrideStatus(Status status) {
overrideQueue.add(status);
}

@Override
Expand All @@ -39,15 +39,17 @@ public void accept(ManagedChannelBuilder<?> t) {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return new ProxyClientCall<>(next, method, callOptions);
return new ProxyClientCall<>(next, overrideQueue.poll(), method, callOptions);
}

private class ProxyClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {
private static class ProxyClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {
private final ClientCall<ReqT, RespT> delegate;
private final Status overriden;

private ProxyClientCall(Channel channel, MethodDescriptor<ReqT, RespT> method,
private ProxyClientCall(Channel channel, Status overriden, MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions) {
this.delegate = channel.newCall(method, callOptions);
this.overriden = overriden;
}

@Override
Expand Down Expand Up @@ -110,8 +112,7 @@ public void onMessage(RespT message) {

@Override
public void onClose(Status status, Metadata trailers) {
Status next = nextStatus.poll();
delegate.onClose(next != null ? next : status, trailers);
delegate.onClose(overriden != null ? overriden : status, trailers);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void sessionExecuteDataQueryTest() {
Session s1 = getSession();
String id1 = s1.getId();

grpcInterceptor.addNextStatus(io.grpc.Status.UNAVAILABLE);
grpcInterceptor.addOverrideStatus(io.grpc.Status.UNAVAILABLE);

Result<DataQueryResult> res = s1.executeDataQuery("SELECT 1 + 2", TxControl.snapshotRo()).join();
Assert.assertEquals(StatusCode.TRANSPORT_UNAVAILABLE, res.getStatus().getCode());
Expand Down Expand Up @@ -128,7 +128,7 @@ public void sessionExecuteScanQueryTest() {
Session s1 = getSession();
String id1 = s1.getId();

grpcInterceptor.addNextStatus(io.grpc.Status.UNAVAILABLE);
grpcInterceptor.addOverrideStatus(io.grpc.Status.UNAVAILABLE);

Status res = s1.executeScanQuery("SELECT 1 + 2", Params.empty(), settings).start(rsr -> {}).join();
Assert.assertEquals(StatusCode.TRANSPORT_UNAVAILABLE, res.getCode());
Expand Down