Skip to content

Commit 10f72d6

Browse files
committed
WIP WIP WIP Handle indeterminate YDB request state
1 parent ac58941 commit 10f72d6

File tree

23 files changed

+591
-201
lines changed

23 files changed

+591
-201
lines changed

aspect/src/main/java/tech/ydb/yoj/aspect/tx/YojTransactionAspect.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import org.aspectj.lang.annotation.Aspect;
77
import tech.ydb.yoj.repository.db.Tx;
88
import tech.ydb.yoj.repository.db.TxManager;
9+
import tech.ydb.yoj.repository.db.exception.ConditionallyRetryableException;
910
import tech.ydb.yoj.repository.db.exception.RetryableException;
1011

1112
/**
@@ -68,7 +69,7 @@ private Object doInTransaction(ProceedingJoinPoint pjp, YojTransactional transac
6869

6970
return localTx.tx(() -> safeCall(pjp));
7071
}
71-
} catch (CallRetryableException | CallException e) {
72+
} catch (CallRetryableException | CallConditionallyRetryableException | CallException e) {
7273
throw e.getCause();
7374
}
7475
}
@@ -88,17 +89,28 @@ Object safeCall(ProceedingJoinPoint pjp) {
8889
return pjp.proceed();
8990
} catch (RetryableException e) {
9091
throw new CallRetryableException(e);
92+
} catch (ConditionallyRetryableException e) {
93+
throw new CallConditionallyRetryableException(e);
9194
} catch (Throwable e) {
9295
throw new CallException(e);
9396
}
9497
}
9598

9699
/**
97-
* It's a hint for tx manager to retry was requested
100+
* It's a hint for tx manager that an unconditional retry was requested
98101
*/
99102
static class CallRetryableException extends RetryableException {
100103
CallRetryableException(RetryableException e) {
101-
super(e.getMessage(), e.getCause());
104+
super(e.getMessage(), e.getRetryPolicy(), e.getCause());
105+
}
106+
}
107+
108+
/**
109+
* It's a hint for tx manager that a conditional retry was requested
110+
*/
111+
static class CallConditionallyRetryableException extends ConditionallyRetryableException {
112+
CallConditionallyRetryableException(ConditionallyRetryableException e) {
113+
super(e.getMessage(), e.getRetryPolicy(), e.getCause());
102114
}
103115
}
104116

repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryRepositoryTransaction.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@
2323
import java.util.function.Supplier;
2424

2525
public class InMemoryRepositoryTransaction implements BaseDb, RepositoryTransaction {
26-
private final static AtomicLong txIdGenerator = new AtomicLong();
26+
private static final String CLOSE_ACTION_COMMIT = "commit()";
27+
private static final String CLOSE_ACTION_ROLLBACK = "rollback()";
28+
29+
private static final AtomicLong txIdGenerator = new AtomicLong();
2730

2831
private final long txId = txIdGenerator.incrementAndGet();
2932
private final Stopwatch txStopwatch = Stopwatch.createStarted();
@@ -81,7 +84,12 @@ public void commit() {
8184
if (isBadSession) {
8285
throw new IllegalStateException("Transaction was invalidated. Commit isn't possible");
8386
}
84-
endTransaction("commit()", this::commitImpl);
87+
endTransaction(CLOSE_ACTION_COMMIT, this::commitImpl);
88+
}
89+
90+
@Override
91+
public boolean wasCommitAttempted() {
92+
return CLOSE_ACTION_COMMIT.equals(closeAction);
8593
}
8694

8795
private void commitImpl() {
@@ -101,14 +109,15 @@ private void commitImpl() {
101109

102110
@Override
103111
public void rollback() {
104-
endTransaction("rollback()", this::rollbackImpl);
112+
endTransaction(CLOSE_ACTION_ROLLBACK, this::rollbackImpl);
105113
}
106114

107115
private void rollbackImpl() {
108116
storage.rollback(txId);
109117
}
110118

111119
private void endTransaction(String action, Runnable runnable) {
120+
ensureTransactionActive();
112121
try {
113122
if (isFinalActionNeeded(action)) {
114123
logTransaction(action, runnable);
@@ -134,6 +143,7 @@ private boolean isFinalActionNeeded(String action) {
134143
final <T extends Entity<T>> void doInWriteTransaction(
135144
String log, TableDescriptor<T> tableDescriptor, Consumer<WriteTxDataShard<T>> consumer
136145
) {
146+
ensureTransactionActive();
137147
if (options.isScan()) {
138148
throw new IllegalTransactionScanException("Mutable operations");
139149
}
@@ -158,6 +168,7 @@ final <T extends Entity<T>> void doInWriteTransaction(
158168
final <T extends Entity<T>, R> R doInTransaction(
159169
String action, TableDescriptor<T> tableDescriptor, Function<ReadOnlyTxDataShard<T>, R> func
160170
) {
171+
ensureTransactionActive();
161172
return logTransaction(action, () -> {
162173
InMemoryTxLockWatcher findWatcher = hasWrites ? watcher : InMemoryTxLockWatcher.NO_LOCKS;
163174
ReadOnlyTxDataShard<T> shard = storage.getReadOnlyTxDataShard(
@@ -180,10 +191,6 @@ private void logTransaction(String action, Runnable runnable) {
180191
}
181192

182193
private <R> R logTransaction(String action, Supplier<R> supplier) {
183-
if (closeAction != null) {
184-
throw new IllegalStateException("Transaction already closed by " + closeAction);
185-
}
186-
187194
Stopwatch sw = Stopwatch.createStarted();
188195
try {
189196
R result = supplier.get();
@@ -195,6 +202,12 @@ private <R> R logTransaction(String action, Supplier<R> supplier) {
195202
}
196203
}
197204

205+
private void ensureTransactionActive() {
206+
if (closeAction != null) {
207+
throw new IllegalStateException("Transaction already closed by " + closeAction);
208+
}
209+
}
210+
198211
private String printResult(Object result) {
199212
if (result instanceof Iterable<?>) {
200213
long size = Iterables.size((Iterable<?>) result);
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
package tech.ydb.yoj.repository.ydb.exception;
22

33
import tech.ydb.yoj.repository.db.exception.RetryableException;
4+
import tech.ydb.yoj.util.retry.RetryPolicy;
45

56
/**
67
* Tried to use a no longer active or valid YDB session, e.g. on a node that is now down.
78
*/
89
public class BadSessionException extends RetryableException {
910
public BadSessionException(String message) {
10-
super(message);
11+
super(message, RetryPolicy.retryImmediately());
1112
}
1213
}

repository-ydb-common/src/main/java/tech/ydb/yoj/repository/ydb/exception/YdbClientInternalException.java

Lines changed: 0 additions & 23 deletions
This file was deleted.
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package tech.ydb.yoj.repository.ydb.exception;
2+
3+
import lombok.Getter;
4+
import tech.ydb.yoj.ExperimentalApi;
5+
import tech.ydb.yoj.repository.db.exception.ConditionallyRetryableException;
6+
import tech.ydb.yoj.util.lang.Strings;
7+
import tech.ydb.yoj.util.retry.RetryPolicy;
8+
9+
/**
10+
* Base class for <em>conditionally-retryable</em> exceptions from the YDB database, the YDB Java SDK, and the GRPC client used by the YDB Java SDK.
11+
*
12+
* @see ConditionallyRetryableException Conditionally-retryable Exceptions
13+
*/
14+
@ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/165")
15+
public class YdbConditionallyRetryableException extends ConditionallyRetryableException {
16+
private static final RetryPolicy UNDETERMINED_BACKOFF = RetryPolicy.expBackoff(5L, 500L, 0.1, 2.0);
17+
18+
@Getter
19+
private final Enum<?> statusCode;
20+
21+
public YdbConditionallyRetryableException(String message, Enum<?> statusCode, Object request, Object response) {
22+
this(message, statusCode, request, response, UNDETERMINED_BACKOFF);
23+
}
24+
25+
public YdbConditionallyRetryableException(String message, Enum<?> statusCode, Object request, Object response, RetryPolicy retryPolicy) {
26+
super(Strings.join("\n", "[" + statusCode + "] " + message, request, response), retryPolicy);
27+
this.statusCode = statusCode;
28+
}
29+
}

repository-ydb-common/src/main/java/tech/ydb/yoj/repository/ydb/exception/YdbUnauthenticatedException.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,14 @@
44
import tech.ydb.yoj.repository.db.exception.RetryableException;
55
import tech.ydb.yoj.repository.db.exception.UnavailableException;
66
import tech.ydb.yoj.util.lang.Strings;
7+
import tech.ydb.yoj.util.retry.RetryPolicy;
78

89
/**
910
* YDB authentication failure, possibly a transient one. E.g., used a recently expired token.
1011
*/
1112
public class YdbUnauthenticatedException extends RetryableException {
1213
public YdbUnauthenticatedException(Object request, Object response) {
13-
super(Strings.join("\n", request, response));
14+
super(Strings.join("\n", request, response), RetryPolicy.retryImmediately());
1415
}
1516

1617
@Override

repository-ydb-common/src/main/java/tech/ydb/yoj/repository/ydb/exception/YdbUnauthorizedException.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,15 @@
44
import tech.ydb.yoj.repository.db.exception.RetryableException;
55
import tech.ydb.yoj.repository.db.exception.UnavailableException;
66
import tech.ydb.yoj.util.lang.Strings;
7+
import tech.ydb.yoj.util.retry.RetryPolicy;
78

89
/**
910
* YDB authorization failure, possibly a transient one. E.g., the principal tried to write to the database but has no
1011
* write-allowing role assigned.
1112
*/
1213
public class YdbUnauthorizedException extends RetryableException {
1314
public YdbUnauthorizedException(Object request, Object response) {
14-
super(Strings.join("\n", request, response));
15+
super(Strings.join("\n", request, response), RetryPolicy.retryImmediately());
1516
}
1617

1718
@Override

repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbOperations.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import java.util.concurrent.TimeUnit;
1616
import java.util.concurrent.TimeoutException;
1717

18-
import static tech.ydb.yoj.repository.ydb.client.YdbValidator.checkGrpcContextStatus;
18+
import static tech.ydb.yoj.repository.ydb.client.YdbValidator.checkGrpcTimeoutAndCancellation;
1919
import static tech.ydb.yoj.util.lang.Interrupts.isThreadInterrupted;
2020

2121
@InternalApi
@@ -40,7 +40,7 @@ private static RepositoryException convertToUnavailable(Throwable ex) {
4040
Thread.currentThread().interrupt();
4141
return new QueryInterruptedException("DB query interrupted", ex);
4242
}
43-
checkGrpcContextStatus(ex.getMessage(), ex);
43+
checkGrpcTimeoutAndCancellation(ex.getMessage(), ex);
4444

4545
return new UnavailableException("DB is unavailable", ex);
4646
}

0 commit comments

Comments
 (0)