Skip to content

Commit 7e0648f

Browse files
committed
Fixed data race in ProxyStream
1 parent fe1b8ee commit 7e0648f

File tree

3 files changed

+56
-34
lines changed

3 files changed

+56
-34
lines changed
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package tech.ydb.core.utils;
2+
3+
import java.util.Optional;
4+
5+
/**
6+
*
7+
* @author Aleksandr Gorshenin
8+
* @param <R> type of value
9+
*/
10+
public class UpdatableOptional<R> {
11+
private volatile R value = null;
12+
13+
public void update(R value) {
14+
this.value = value;
15+
}
16+
17+
public R orElse(R other) {
18+
return Optional.ofNullable(value).orElse(other);
19+
}
20+
21+
public R get() {
22+
return value;
23+
}
24+
}

query/src/main/java/tech/ydb/query/impl/SessionImpl.java

Lines changed: 13 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import tech.ydb.core.operation.StatusExtractor;
2525
import tech.ydb.core.settings.BaseRequestSettings;
2626
import tech.ydb.core.utils.URITools;
27+
import tech.ydb.core.utils.UpdatableOptional;
2728
import tech.ydb.proto.query.YdbQuery;
2829
import tech.ydb.query.QuerySession;
2930
import tech.ydb.query.QueryStream;
@@ -287,9 +288,9 @@ void handleCompletion(Status status, Throwable th) { }
287288

288289
@Override
289290
public CompletableFuture<Result<QueryInfo>> execute(PartsHandler handler) {
290-
final CompletableFuture<Result<QueryInfo>> result = new CompletableFuture<>();
291-
final AtomicReference<QueryStats> stats = new AtomicReference<>();
292-
grpcStream.start(msg -> {
291+
final UpdatableOptional<Status> operationStatus = new UpdatableOptional<>();
292+
final UpdatableOptional<QueryStats> stats = new UpdatableOptional<>();
293+
return grpcStream.start(msg -> {
293294
if (isTraceEnabled) {
294295
logger.trace("{} got stream message {}", SessionImpl.this, TextFormat.shortDebugString(msg));
295296
}
@@ -300,7 +301,7 @@ public CompletableFuture<Result<QueryInfo>> execute(PartsHandler handler) {
300301

301302
if (!status.isSuccess()) {
302303
handleTxMeta(null);
303-
result.complete(Result.fail(status));
304+
operationStatus.update(status);
304305
return;
305306
}
306307

@@ -315,10 +316,7 @@ public CompletableFuture<Result<QueryInfo>> execute(PartsHandler handler) {
315316
}
316317
}
317318
if (msg.hasExecStats()) {
318-
QueryStats old = stats.getAndSet(new QueryStats(msg.getExecStats()));
319-
if (old != null) {
320-
logger.warn("{} lost previous exec stats {}", SessionImpl.this, old);
321-
}
319+
stats.update(new QueryStats(msg.getExecStats()));
322320
}
323321

324322
if (msg.hasResultSet()) {
@@ -329,21 +327,15 @@ public CompletableFuture<Result<QueryInfo>> execute(PartsHandler handler) {
329327
logger.trace("{} lost result set part with index {}", SessionImpl.this, index);
330328
}
331329
}
332-
}).whenComplete((status, th) -> {
333-
handleCompletion(status, th);
334-
if (th != null) {
335-
result.completeExceptionally(th);
336-
}
337-
if (status != null) {
338-
updateSessionState(status);
339-
if (status.isSuccess()) {
340-
result.complete(Result.success(new QueryInfo(stats.get()), status));
341-
} else {
342-
result.complete(Result.fail(status));
343-
}
330+
}).whenComplete(this::handleCompletion).thenApply(streamStatus -> {
331+
updateSessionState(streamStatus);
332+
Status status = operationStatus.orElse(streamStatus);
333+
if (status.isSuccess()) {
334+
return Result.success(new QueryInfo(stats.get()), streamStatus);
335+
} else {
336+
return Result.fail(status);
344337
}
345338
});
346-
return result;
347339
}
348340

349341
@Override

table/src/main/java/tech/ydb/table/impl/BaseSession.java

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import tech.ydb.core.settings.BaseRequestSettings;
3737
import tech.ydb.core.utils.ProtobufUtils;
3838
import tech.ydb.core.utils.URITools;
39+
import tech.ydb.core.utils.UpdatableOptional;
3940
import tech.ydb.proto.StatusCodesProtos.StatusIds;
4041
import tech.ydb.proto.ValueProtos;
4142
import tech.ydb.proto.ValueProtos.TypedValue;
@@ -1387,42 +1388,47 @@ public String toString() {
13871388

13881389
private abstract class ProxyStream<R, T> implements GrpcReadStream<T> {
13891390
private final GrpcReadStream<R> origin;
1390-
private final CompletableFuture<Status> result = new CompletableFuture<>();
1391+
private final CompletableFuture<Status> result;
1392+
private final UpdatableOptional<Status> operationStatus = new UpdatableOptional<>();
1393+
private final UpdatableOptional<Throwable> operationError = new UpdatableOptional<>();
13911394

13921395
ProxyStream(GrpcReadStream<R> origin) {
13931396
this.origin = origin;
1397+
this.result = new CompletableFuture<>();
13941398
}
13951399

13961400
abstract StatusIds.StatusCode readStatusCode(R message);
13971401
abstract List<YdbIssueMessage.IssueMessage> readIssues(R message);
13981402
abstract T readValue(R message);
13991403

1400-
private void onClose(Status status, Throwable th) {
1404+
private void onClose(Status streamStatus, Throwable streamError) {
1405+
Throwable th = operationError.orElse(streamError);
14011406
if (th != null) {
14021407
updateSessionState(th, null, false);
14031408
result.completeExceptionally(th);
1409+
return;
14041410
}
1405-
if (status != null) {
1406-
updateSessionState(null, status.getCode(), false);
1407-
result.complete(status);
1411+
1412+
Status st = operationStatus.orElse(streamStatus);
1413+
if (st != null) {
1414+
updateSessionState(null, st.getCode(), false);
1415+
result.complete(st);
14081416
}
14091417
}
14101418

14111419
@Override
14121420
public CompletableFuture<Status> start(Observer<T> observer) {
1413-
origin.start(message -> {
1414-
StatusIds.StatusCode statusCode = readStatusCode(message);
1421+
origin.start(msg -> {
1422+
StatusIds.StatusCode statusCode = readStatusCode(msg);
14151423
if (statusCode == StatusIds.StatusCode.SUCCESS) {
14161424
try {
1417-
observer.onNext(readValue(message));
1418-
} catch (Throwable t) {
1419-
result.completeExceptionally(t);
1425+
observer.onNext(readValue(msg));
1426+
} catch (Throwable th) {
1427+
operationError.update(th);
14201428
origin.cancel();
14211429
}
14221430
} else {
1423-
Issue[] issues = Issue.fromPb(readIssues(message));
1424-
StatusCode code = StatusCode.fromProto(statusCode);
1425-
result.complete(Status.of(code, issues));
1431+
operationStatus.update(Status.of(StatusCode.fromProto(statusCode), Issue.fromPb(readIssues(msg))));
14261432
origin.cancel();
14271433
}
14281434
}).whenComplete(this::onClose);

0 commit comments

Comments
 (0)