Skip to content

Commit 3b6d6e6

Browse files
authored
Merge pull request #536 from alex268/master
Fixed data race on stream result processing
2 parents acf9f06 + 3cda13c commit 3b6d6e6

File tree

7 files changed

+78
-54
lines changed

7 files changed

+78
-54
lines changed
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package tech.ydb.core.utils;
2+
3+
import java.util.Optional;
4+
5+
/**
6+
*
7+
* @author Aleksandr Gorshenin
8+
* @param <R> type of value
9+
*/
10+
public class UpdatableOptional<R> {
11+
private volatile R value = null;
12+
13+
public void update(R value) {
14+
this.value = value;
15+
}
16+
17+
public R orElse(R other) {
18+
return Optional.ofNullable(value).orElse(other);
19+
}
20+
21+
public R get() {
22+
return value;
23+
}
24+
}

query/src/main/java/tech/ydb/query/impl/SessionImpl.java

Lines changed: 13 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import tech.ydb.core.operation.StatusExtractor;
2525
import tech.ydb.core.settings.BaseRequestSettings;
2626
import tech.ydb.core.utils.URITools;
27+
import tech.ydb.core.utils.UpdatableOptional;
2728
import tech.ydb.proto.query.YdbQuery;
2829
import tech.ydb.query.QuerySession;
2930
import tech.ydb.query.QueryStream;
@@ -287,9 +288,9 @@ void handleCompletion(Status status, Throwable th) { }
287288

288289
@Override
289290
public CompletableFuture<Result<QueryInfo>> execute(PartsHandler handler) {
290-
final CompletableFuture<Result<QueryInfo>> result = new CompletableFuture<>();
291-
final AtomicReference<QueryStats> stats = new AtomicReference<>();
292-
grpcStream.start(msg -> {
291+
final UpdatableOptional<Status> operationStatus = new UpdatableOptional<>();
292+
final UpdatableOptional<QueryStats> stats = new UpdatableOptional<>();
293+
return grpcStream.start(msg -> {
293294
if (isTraceEnabled) {
294295
logger.trace("{} got stream message {}", SessionImpl.this, TextFormat.shortDebugString(msg));
295296
}
@@ -300,7 +301,7 @@ public CompletableFuture<Result<QueryInfo>> execute(PartsHandler handler) {
300301

301302
if (!status.isSuccess()) {
302303
handleTxMeta(null);
303-
result.complete(Result.fail(status));
304+
operationStatus.update(status);
304305
return;
305306
}
306307

@@ -315,10 +316,7 @@ public CompletableFuture<Result<QueryInfo>> execute(PartsHandler handler) {
315316
}
316317
}
317318
if (msg.hasExecStats()) {
318-
QueryStats old = stats.getAndSet(new QueryStats(msg.getExecStats()));
319-
if (old != null) {
320-
logger.warn("{} lost previous exec stats {}", SessionImpl.this, old);
321-
}
319+
stats.update(new QueryStats(msg.getExecStats()));
322320
}
323321

324322
if (msg.hasResultSet()) {
@@ -329,21 +327,15 @@ public CompletableFuture<Result<QueryInfo>> execute(PartsHandler handler) {
329327
logger.trace("{} lost result set part with index {}", SessionImpl.this, index);
330328
}
331329
}
332-
}).whenComplete((status, th) -> {
333-
handleCompletion(status, th);
334-
if (th != null) {
335-
result.completeExceptionally(th);
336-
}
337-
if (status != null) {
338-
updateSessionState(status);
339-
if (status.isSuccess()) {
340-
result.complete(Result.success(new QueryInfo(stats.get()), status));
341-
} else {
342-
result.complete(Result.fail(status));
343-
}
330+
}).whenComplete(this::handleCompletion).thenApply(streamStatus -> {
331+
updateSessionState(streamStatus);
332+
Status status = operationStatus.orElse(streamStatus);
333+
if (status.isSuccess()) {
334+
return Result.success(new QueryInfo(stats.get()), streamStatus);
335+
} else {
336+
return Result.fail(status);
344337
}
345338
});
346-
return result;
347339
}
348340

349341
@Override

query/src/test/java/tech/ydb/query/impl/GrpcTestInterceptor.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,14 @@
2121
* @author Aleksandr Gorshenin
2222
*/
2323
public class GrpcTestInterceptor implements Consumer<ManagedChannelBuilder<?>>, ClientInterceptor {
24-
private final Queue<Status> nextStatus = new ConcurrentLinkedQueue<>();
24+
private volatile Queue<Status> overrideQueue = new ConcurrentLinkedQueue<>();
2525

2626
public void reset() {
27-
nextStatus.clear();
27+
overrideQueue = new ConcurrentLinkedQueue<>();
2828
}
2929

30-
public void addNextStatus(Status status) {
31-
nextStatus.add(status);
30+
public void addOverrideStatus(Status status) {
31+
overrideQueue.add(status);
3232
}
3333

3434
@Override
@@ -39,15 +39,17 @@ public void accept(ManagedChannelBuilder<?> t) {
3939
@Override
4040
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
4141
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
42-
return new ProxyClientCall<>(next, method, callOptions);
42+
return new ProxyClientCall<>(next, overrideQueue.poll(), method, callOptions);
4343
}
4444

45-
private class ProxyClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {
45+
private static class ProxyClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {
4646
private final ClientCall<ReqT, RespT> delegate;
47+
private final Status overriden;
4748

48-
private ProxyClientCall(Channel channel, MethodDescriptor<ReqT, RespT> method,
49+
private ProxyClientCall(Channel channel, Status overriden, MethodDescriptor<ReqT, RespT> method,
4950
CallOptions callOptions) {
5051
this.delegate = channel.newCall(method, callOptions);
52+
this.overriden = overriden;
5153
}
5254

5355
@Override
@@ -110,8 +112,7 @@ public void onMessage(RespT message) {
110112

111113
@Override
112114
public void onClose(Status status, Metadata trailers) {
113-
Status next = nextStatus.poll();
114-
delegate.onClose(next != null ? next : status, trailers);
115+
delegate.onClose(overriden != null ? overriden : status, trailers);
115116
}
116117

117118
@Override

query/src/test/java/tech/ydb/query/impl/QueryClientTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public void sessionExecuteQueryTest() {
9494
QuerySession s1 = getSession();
9595
String id1 = s1.getId();
9696

97-
grpcInterceptor.addNextStatus(io.grpc.Status.UNAVAILABLE);
97+
grpcInterceptor.addOverrideStatus(io.grpc.Status.UNAVAILABLE);
9898

9999
Result<QueryInfo> res = s1.createQuery("SELECT 1 + 2", TxMode.NONE).execute().join();
100100
Assert.assertEquals(StatusCode.TRANSPORT_UNAVAILABLE, res.getStatus().getCode());

table/src/main/java/tech/ydb/table/impl/BaseSession.java

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import tech.ydb.core.settings.BaseRequestSettings;
3737
import tech.ydb.core.utils.ProtobufUtils;
3838
import tech.ydb.core.utils.URITools;
39+
import tech.ydb.core.utils.UpdatableOptional;
3940
import tech.ydb.proto.StatusCodesProtos.StatusIds;
4041
import tech.ydb.proto.ValueProtos;
4142
import tech.ydb.proto.ValueProtos.TypedValue;
@@ -1387,42 +1388,47 @@ public String toString() {
13871388

13881389
private abstract class ProxyStream<R, T> implements GrpcReadStream<T> {
13891390
private final GrpcReadStream<R> origin;
1390-
private final CompletableFuture<Status> result = new CompletableFuture<>();
1391+
private final CompletableFuture<Status> result;
1392+
private final UpdatableOptional<Status> operationStatus = new UpdatableOptional<>();
1393+
private final UpdatableOptional<Throwable> operationError = new UpdatableOptional<>();
13911394

13921395
ProxyStream(GrpcReadStream<R> origin) {
13931396
this.origin = origin;
1397+
this.result = new CompletableFuture<>();
13941398
}
13951399

13961400
abstract StatusIds.StatusCode readStatusCode(R message);
13971401
abstract List<YdbIssueMessage.IssueMessage> readIssues(R message);
13981402
abstract T readValue(R message);
13991403

1400-
private void onClose(Status status, Throwable th) {
1404+
private void onClose(Status streamStatus, Throwable streamError) {
1405+
Throwable th = operationError.orElse(streamError);
14011406
if (th != null) {
14021407
updateSessionState(th, null, false);
14031408
result.completeExceptionally(th);
1409+
return;
14041410
}
1405-
if (status != null) {
1406-
updateSessionState(null, status.getCode(), false);
1407-
result.complete(status);
1411+
1412+
Status st = operationStatus.orElse(streamStatus);
1413+
if (st != null) {
1414+
updateSessionState(null, st.getCode(), false);
1415+
result.complete(st);
14081416
}
14091417
}
14101418

14111419
@Override
14121420
public CompletableFuture<Status> start(Observer<T> observer) {
14131421
origin.start(message -> {
1414-
StatusIds.StatusCode statusCode = readStatusCode(message);
1415-
if (statusCode == StatusIds.StatusCode.SUCCESS) {
1422+
StatusIds.StatusCode code = readStatusCode(message);
1423+
if (code == StatusIds.StatusCode.SUCCESS) {
14161424
try {
14171425
observer.onNext(readValue(message));
1418-
} catch (Throwable t) {
1419-
result.completeExceptionally(t);
1426+
} catch (Throwable th) {
1427+
operationError.update(th);
14201428
origin.cancel();
14211429
}
14221430
} else {
1423-
Issue[] issues = Issue.fromPb(readIssues(message));
1424-
StatusCode code = StatusCode.fromProto(statusCode);
1425-
result.complete(Status.of(code, issues));
1431+
operationStatus.update(Status.of(StatusCode.fromProto(code), Issue.fromPb(readIssues(message))));
14261432
origin.cancel();
14271433
}
14281434
}).whenComplete(this::onClose);

table/src/test/java/tech/ydb/table/integration/GrpcTestInterceptor.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,14 @@
2121
* @author Aleksandr Gorshenin
2222
*/
2323
public class GrpcTestInterceptor implements Consumer<ManagedChannelBuilder<?>>, ClientInterceptor {
24-
private final Queue<Status> nextStatus = new ConcurrentLinkedQueue<>();
24+
private volatile Queue<Status> overrideQueue = new ConcurrentLinkedQueue<>();
2525

2626
public void reset() {
27-
nextStatus.clear();
27+
overrideQueue = new ConcurrentLinkedQueue<>();
2828
}
2929

30-
public void addNextStatus(Status status) {
31-
nextStatus.add(status);
30+
public void addOverrideStatus(Status status) {
31+
overrideQueue.add(status);
3232
}
3333

3434
@Override
@@ -39,15 +39,17 @@ public void accept(ManagedChannelBuilder<?> t) {
3939
@Override
4040
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
4141
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
42-
return new ProxyClientCall<>(next, method, callOptions);
42+
return new ProxyClientCall<>(next, overrideQueue.poll(), method, callOptions);
4343
}
4444

45-
private class ProxyClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {
45+
private static class ProxyClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {
4646
private final ClientCall<ReqT, RespT> delegate;
47+
private final Status overriden;
4748

48-
private ProxyClientCall(Channel channel, MethodDescriptor<ReqT, RespT> method,
49+
private ProxyClientCall(Channel channel, Status overriden, MethodDescriptor<ReqT, RespT> method,
4950
CallOptions callOptions) {
5051
this.delegate = channel.newCall(method, callOptions);
52+
this.overriden = overriden;
5153
}
5254

5355
@Override
@@ -110,8 +112,7 @@ public void onMessage(RespT message) {
110112

111113
@Override
112114
public void onClose(Status status, Metadata trailers) {
113-
Status next = nextStatus.poll();
114-
delegate.onClose(next != null ? next : status, trailers);
115+
delegate.onClose(overriden != null ? overriden : status, trailers);
115116
}
116117

117118
@Override

table/src/test/java/tech/ydb/table/integration/TableClientTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public void sessionExecuteDataQueryTest() {
9797
Session s1 = getSession();
9898
String id1 = s1.getId();
9999

100-
grpcInterceptor.addNextStatus(io.grpc.Status.UNAVAILABLE);
100+
grpcInterceptor.addOverrideStatus(io.grpc.Status.UNAVAILABLE);
101101

102102
Result<DataQueryResult> res = s1.executeDataQuery("SELECT 1 + 2", TxControl.snapshotRo()).join();
103103
Assert.assertEquals(StatusCode.TRANSPORT_UNAVAILABLE, res.getStatus().getCode());
@@ -128,7 +128,7 @@ public void sessionExecuteScanQueryTest() {
128128
Session s1 = getSession();
129129
String id1 = s1.getId();
130130

131-
grpcInterceptor.addNextStatus(io.grpc.Status.UNAVAILABLE);
131+
grpcInterceptor.addOverrideStatus(io.grpc.Status.UNAVAILABLE);
132132

133133
Status res = s1.executeScanQuery("SELECT 1 + 2", Params.empty(), settings).start(rsr -> {}).join();
134134
Assert.assertEquals(StatusCode.TRANSPORT_UNAVAILABLE, res.getCode());

0 commit comments

Comments
 (0)