From fe1b8ee5b3a72017c45807244f35c116991aa4a1 Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Wed, 3 Sep 2025 11:26:09 +0100 Subject: [PATCH 1/3] Fixed test flapping --- .../ydb/query/impl/GrpcTestInterceptor.java | 19 ++++++++++--------- .../tech/ydb/query/impl/QueryClientTest.java | 2 +- .../integration/GrpcTestInterceptor.java | 19 ++++++++++--------- .../table/integration/TableClientTest.java | 4 ++-- 4 files changed, 23 insertions(+), 21 deletions(-) diff --git a/query/src/test/java/tech/ydb/query/impl/GrpcTestInterceptor.java b/query/src/test/java/tech/ydb/query/impl/GrpcTestInterceptor.java index 50c6a75e..fb5d697d 100644 --- a/query/src/test/java/tech/ydb/query/impl/GrpcTestInterceptor.java +++ b/query/src/test/java/tech/ydb/query/impl/GrpcTestInterceptor.java @@ -21,14 +21,14 @@ * @author Aleksandr Gorshenin */ public class GrpcTestInterceptor implements Consumer>, ClientInterceptor { - private final Queue nextStatus = new ConcurrentLinkedQueue<>(); + private volatile Queue overrideQueue = new ConcurrentLinkedQueue<>(); public void reset() { - nextStatus.clear(); + overrideQueue = new ConcurrentLinkedQueue<>(); } - public void addNextStatus(Status status) { - nextStatus.add(status); + public void addOverrideStatus(Status status) { + overrideQueue.add(status); } @Override @@ -39,15 +39,17 @@ public void accept(ManagedChannelBuilder t) { @Override public ClientCall interceptCall( MethodDescriptor method, CallOptions callOptions, Channel next) { - return new ProxyClientCall<>(next, method, callOptions); + return new ProxyClientCall<>(next, overrideQueue.poll(), method, callOptions); } - private class ProxyClientCall extends ClientCall { + private static class ProxyClientCall extends ClientCall { private final ClientCall delegate; + private final Status overrided; - private ProxyClientCall(Channel channel, MethodDescriptor method, + private ProxyClientCall(Channel channel, Status overrided, MethodDescriptor method, CallOptions callOptions) { this.delegate = channel.newCall(method, callOptions); + this.overrided = overrided; } @Override @@ -110,8 +112,7 @@ public void onMessage(RespT message) { @Override public void onClose(Status status, Metadata trailers) { - Status next = nextStatus.poll(); - delegate.onClose(next != null ? next : status, trailers); + delegate.onClose(overrided != null ? overrided : status, trailers); } @Override diff --git a/query/src/test/java/tech/ydb/query/impl/QueryClientTest.java b/query/src/test/java/tech/ydb/query/impl/QueryClientTest.java index b15793b3..427a4007 100644 --- a/query/src/test/java/tech/ydb/query/impl/QueryClientTest.java +++ b/query/src/test/java/tech/ydb/query/impl/QueryClientTest.java @@ -94,7 +94,7 @@ public void sessionExecuteQueryTest() { QuerySession s1 = getSession(); String id1 = s1.getId(); - grpcInterceptor.addNextStatus(io.grpc.Status.UNAVAILABLE); + grpcInterceptor.addOverrideStatus(io.grpc.Status.UNAVAILABLE); Result res = s1.createQuery("SELECT 1 + 2", TxMode.NONE).execute().join(); Assert.assertEquals(StatusCode.TRANSPORT_UNAVAILABLE, res.getStatus().getCode()); diff --git a/table/src/test/java/tech/ydb/table/integration/GrpcTestInterceptor.java b/table/src/test/java/tech/ydb/table/integration/GrpcTestInterceptor.java index 468553fb..bcb14cc6 100644 --- a/table/src/test/java/tech/ydb/table/integration/GrpcTestInterceptor.java +++ b/table/src/test/java/tech/ydb/table/integration/GrpcTestInterceptor.java @@ -21,14 +21,14 @@ * @author Aleksandr Gorshenin */ public class GrpcTestInterceptor implements Consumer>, ClientInterceptor { - private final Queue nextStatus = new ConcurrentLinkedQueue<>(); + private volatile Queue overrideQueue = new ConcurrentLinkedQueue<>(); public void reset() { - nextStatus.clear(); + overrideQueue = new ConcurrentLinkedQueue<>(); } - public void addNextStatus(Status status) { - nextStatus.add(status); + public void addOverrideStatus(Status status) { + overrideQueue.add(status); } @Override @@ -39,15 +39,17 @@ public void accept(ManagedChannelBuilder t) { @Override public ClientCall interceptCall( MethodDescriptor method, CallOptions callOptions, Channel next) { - return new ProxyClientCall<>(next, method, callOptions); + return new ProxyClientCall<>(next, overrideQueue.poll(), method, callOptions); } - private class ProxyClientCall extends ClientCall { + private static class ProxyClientCall extends ClientCall { private final ClientCall delegate; + private final Status overrided; - private ProxyClientCall(Channel channel, MethodDescriptor method, + private ProxyClientCall(Channel channel, Status overrided, MethodDescriptor method, CallOptions callOptions) { this.delegate = channel.newCall(method, callOptions); + this.overrided = overrided; } @Override @@ -110,8 +112,7 @@ public void onMessage(RespT message) { @Override public void onClose(Status status, Metadata trailers) { - Status next = nextStatus.poll(); - delegate.onClose(next != null ? next : status, trailers); + delegate.onClose(overrided != null ? overrided : status, trailers); } @Override diff --git a/table/src/test/java/tech/ydb/table/integration/TableClientTest.java b/table/src/test/java/tech/ydb/table/integration/TableClientTest.java index ee96039d..5a2a303f 100644 --- a/table/src/test/java/tech/ydb/table/integration/TableClientTest.java +++ b/table/src/test/java/tech/ydb/table/integration/TableClientTest.java @@ -97,7 +97,7 @@ public void sessionExecuteDataQueryTest() { Session s1 = getSession(); String id1 = s1.getId(); - grpcInterceptor.addNextStatus(io.grpc.Status.UNAVAILABLE); + grpcInterceptor.addOverrideStatus(io.grpc.Status.UNAVAILABLE); Result res = s1.executeDataQuery("SELECT 1 + 2", TxControl.snapshotRo()).join(); Assert.assertEquals(StatusCode.TRANSPORT_UNAVAILABLE, res.getStatus().getCode()); @@ -128,7 +128,7 @@ public void sessionExecuteScanQueryTest() { Session s1 = getSession(); String id1 = s1.getId(); - grpcInterceptor.addNextStatus(io.grpc.Status.UNAVAILABLE); + grpcInterceptor.addOverrideStatus(io.grpc.Status.UNAVAILABLE); Status res = s1.executeScanQuery("SELECT 1 + 2", Params.empty(), settings).start(rsr -> {}).join(); Assert.assertEquals(StatusCode.TRANSPORT_UNAVAILABLE, res.getCode()); From 7e0648fc4cd4786029cb1d2667ae598476255d7b Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Wed, 3 Sep 2025 12:19:11 +0100 Subject: [PATCH 2/3] Fixed data race in ProxyStream --- .../ydb/core/utils/UpdatableOptional.java | 24 +++++++++++++ .../java/tech/ydb/query/impl/SessionImpl.java | 34 +++++++------------ .../java/tech/ydb/table/impl/BaseSession.java | 32 ++++++++++------- 3 files changed, 56 insertions(+), 34 deletions(-) create mode 100644 core/src/main/java/tech/ydb/core/utils/UpdatableOptional.java diff --git a/core/src/main/java/tech/ydb/core/utils/UpdatableOptional.java b/core/src/main/java/tech/ydb/core/utils/UpdatableOptional.java new file mode 100644 index 00000000..a5e24bf8 --- /dev/null +++ b/core/src/main/java/tech/ydb/core/utils/UpdatableOptional.java @@ -0,0 +1,24 @@ +package tech.ydb.core.utils; + +import java.util.Optional; + +/** + * + * @author Aleksandr Gorshenin + * @param type of value + */ +public class UpdatableOptional { + private volatile R value = null; + + public void update(R value) { + this.value = value; + } + + public R orElse(R other) { + return Optional.ofNullable(value).orElse(other); + } + + public R get() { + return value; + } +} diff --git a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java index 96c1cd3f..c146018b 100644 --- a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java @@ -24,6 +24,7 @@ import tech.ydb.core.operation.StatusExtractor; import tech.ydb.core.settings.BaseRequestSettings; import tech.ydb.core.utils.URITools; +import tech.ydb.core.utils.UpdatableOptional; import tech.ydb.proto.query.YdbQuery; import tech.ydb.query.QuerySession; import tech.ydb.query.QueryStream; @@ -287,9 +288,9 @@ void handleCompletion(Status status, Throwable th) { } @Override public CompletableFuture> execute(PartsHandler handler) { - final CompletableFuture> result = new CompletableFuture<>(); - final AtomicReference stats = new AtomicReference<>(); - grpcStream.start(msg -> { + final UpdatableOptional operationStatus = new UpdatableOptional<>(); + final UpdatableOptional stats = new UpdatableOptional<>(); + return grpcStream.start(msg -> { if (isTraceEnabled) { logger.trace("{} got stream message {}", SessionImpl.this, TextFormat.shortDebugString(msg)); } @@ -300,7 +301,7 @@ public CompletableFuture> execute(PartsHandler handler) { if (!status.isSuccess()) { handleTxMeta(null); - result.complete(Result.fail(status)); + operationStatus.update(status); return; } @@ -315,10 +316,7 @@ public CompletableFuture> execute(PartsHandler handler) { } } if (msg.hasExecStats()) { - QueryStats old = stats.getAndSet(new QueryStats(msg.getExecStats())); - if (old != null) { - logger.warn("{} lost previous exec stats {}", SessionImpl.this, old); - } + stats.update(new QueryStats(msg.getExecStats())); } if (msg.hasResultSet()) { @@ -329,21 +327,15 @@ public CompletableFuture> execute(PartsHandler handler) { logger.trace("{} lost result set part with index {}", SessionImpl.this, index); } } - }).whenComplete((status, th) -> { - handleCompletion(status, th); - if (th != null) { - result.completeExceptionally(th); - } - if (status != null) { - updateSessionState(status); - if (status.isSuccess()) { - result.complete(Result.success(new QueryInfo(stats.get()), status)); - } else { - result.complete(Result.fail(status)); - } + }).whenComplete(this::handleCompletion).thenApply(streamStatus -> { + updateSessionState(streamStatus); + Status status = operationStatus.orElse(streamStatus); + if (status.isSuccess()) { + return Result.success(new QueryInfo(stats.get()), streamStatus); + } else { + return Result.fail(status); } }); - return result; } @Override diff --git a/table/src/main/java/tech/ydb/table/impl/BaseSession.java b/table/src/main/java/tech/ydb/table/impl/BaseSession.java index 4bc106d8..0ec036aa 100644 --- a/table/src/main/java/tech/ydb/table/impl/BaseSession.java +++ b/table/src/main/java/tech/ydb/table/impl/BaseSession.java @@ -36,6 +36,7 @@ import tech.ydb.core.settings.BaseRequestSettings; import tech.ydb.core.utils.ProtobufUtils; import tech.ydb.core.utils.URITools; +import tech.ydb.core.utils.UpdatableOptional; import tech.ydb.proto.StatusCodesProtos.StatusIds; import tech.ydb.proto.ValueProtos; import tech.ydb.proto.ValueProtos.TypedValue; @@ -1387,42 +1388,47 @@ public String toString() { private abstract class ProxyStream implements GrpcReadStream { private final GrpcReadStream origin; - private final CompletableFuture result = new CompletableFuture<>(); + private final CompletableFuture result; + private final UpdatableOptional operationStatus = new UpdatableOptional<>(); + private final UpdatableOptional operationError = new UpdatableOptional<>(); ProxyStream(GrpcReadStream origin) { this.origin = origin; + this.result = new CompletableFuture<>(); } abstract StatusIds.StatusCode readStatusCode(R message); abstract List readIssues(R message); abstract T readValue(R message); - private void onClose(Status status, Throwable th) { + private void onClose(Status streamStatus, Throwable streamError) { + Throwable th = operationError.orElse(streamError); if (th != null) { updateSessionState(th, null, false); result.completeExceptionally(th); + return; } - if (status != null) { - updateSessionState(null, status.getCode(), false); - result.complete(status); + + Status st = operationStatus.orElse(streamStatus); + if (st != null) { + updateSessionState(null, st.getCode(), false); + result.complete(st); } } @Override public CompletableFuture start(Observer observer) { - origin.start(message -> { - StatusIds.StatusCode statusCode = readStatusCode(message); + origin.start(msg -> { + StatusIds.StatusCode statusCode = readStatusCode(msg); if (statusCode == StatusIds.StatusCode.SUCCESS) { try { - observer.onNext(readValue(message)); - } catch (Throwable t) { - result.completeExceptionally(t); + observer.onNext(readValue(msg)); + } catch (Throwable th) { + operationError.update(th); origin.cancel(); } } else { - Issue[] issues = Issue.fromPb(readIssues(message)); - StatusCode code = StatusCode.fromProto(statusCode); - result.complete(Status.of(code, issues)); + operationStatus.update(Status.of(StatusCode.fromProto(statusCode), Issue.fromPb(readIssues(msg)))); origin.cancel(); } }).whenComplete(this::onClose); From 3cda13c5dadfefd97faa133970bc759fbcfd6c5d Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Wed, 3 Sep 2025 15:22:05 +0100 Subject: [PATCH 3/3] Typo fixes --- .../java/tech/ydb/query/impl/GrpcTestInterceptor.java | 8 ++++---- .../src/main/java/tech/ydb/table/impl/BaseSession.java | 10 +++++----- .../ydb/table/integration/GrpcTestInterceptor.java | 8 ++++---- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/query/src/test/java/tech/ydb/query/impl/GrpcTestInterceptor.java b/query/src/test/java/tech/ydb/query/impl/GrpcTestInterceptor.java index fb5d697d..f154d4fb 100644 --- a/query/src/test/java/tech/ydb/query/impl/GrpcTestInterceptor.java +++ b/query/src/test/java/tech/ydb/query/impl/GrpcTestInterceptor.java @@ -44,12 +44,12 @@ public ClientCall interceptCall( private static class ProxyClientCall extends ClientCall { private final ClientCall delegate; - private final Status overrided; + private final Status overriden; - private ProxyClientCall(Channel channel, Status overrided, MethodDescriptor method, + private ProxyClientCall(Channel channel, Status overriden, MethodDescriptor method, CallOptions callOptions) { this.delegate = channel.newCall(method, callOptions); - this.overrided = overrided; + this.overriden = overriden; } @Override @@ -112,7 +112,7 @@ public void onMessage(RespT message) { @Override public void onClose(Status status, Metadata trailers) { - delegate.onClose(overrided != null ? overrided : status, trailers); + delegate.onClose(overriden != null ? overriden : status, trailers); } @Override diff --git a/table/src/main/java/tech/ydb/table/impl/BaseSession.java b/table/src/main/java/tech/ydb/table/impl/BaseSession.java index 0ec036aa..9874552c 100644 --- a/table/src/main/java/tech/ydb/table/impl/BaseSession.java +++ b/table/src/main/java/tech/ydb/table/impl/BaseSession.java @@ -1418,17 +1418,17 @@ private void onClose(Status streamStatus, Throwable streamError) { @Override public CompletableFuture start(Observer observer) { - origin.start(msg -> { - StatusIds.StatusCode statusCode = readStatusCode(msg); - if (statusCode == StatusIds.StatusCode.SUCCESS) { + origin.start(message -> { + StatusIds.StatusCode code = readStatusCode(message); + if (code == StatusIds.StatusCode.SUCCESS) { try { - observer.onNext(readValue(msg)); + observer.onNext(readValue(message)); } catch (Throwable th) { operationError.update(th); origin.cancel(); } } else { - operationStatus.update(Status.of(StatusCode.fromProto(statusCode), Issue.fromPb(readIssues(msg)))); + operationStatus.update(Status.of(StatusCode.fromProto(code), Issue.fromPb(readIssues(message)))); origin.cancel(); } }).whenComplete(this::onClose); diff --git a/table/src/test/java/tech/ydb/table/integration/GrpcTestInterceptor.java b/table/src/test/java/tech/ydb/table/integration/GrpcTestInterceptor.java index bcb14cc6..43f1fda6 100644 --- a/table/src/test/java/tech/ydb/table/integration/GrpcTestInterceptor.java +++ b/table/src/test/java/tech/ydb/table/integration/GrpcTestInterceptor.java @@ -44,12 +44,12 @@ public ClientCall interceptCall( private static class ProxyClientCall extends ClientCall { private final ClientCall delegate; - private final Status overrided; + private final Status overriden; - private ProxyClientCall(Channel channel, Status overrided, MethodDescriptor method, + private ProxyClientCall(Channel channel, Status overriden, MethodDescriptor method, CallOptions callOptions) { this.delegate = channel.newCall(method, callOptions); - this.overrided = overrided; + this.overriden = overriden; } @Override @@ -112,7 +112,7 @@ public void onMessage(RespT message) { @Override public void onClose(Status status, Metadata trailers) { - delegate.onClose(overrided != null ? overrided : status, trailers); + delegate.onClose(overriden != null ? overriden : status, trailers); } @Override