diff --git a/.github/workflows/post-commit.yml b/.github/workflows/post-commit.yml
index 9978caacf8..13a9214445 100644
--- a/.github/workflows/post-commit.yml
+++ b/.github/workflows/post-commit.yml
@@ -14,18 +14,23 @@
# limitations under the License.
name: build-branch
on:
- - push
- - pull_request
+ push:
+ branches-ignore:
+ - 'dependabot/**'
+ tags:
+ - '**'
+ pull_request:
env:
WITH_COVERAGE: true
jobs:
build:
runs-on: ubuntu-20.04
+ timeout-minutes: 30
steps:
- name: Checkout project
- uses: actions/checkout@v3
+ uses: actions/checkout@v4
- name: Cache for maven dependencies
- uses: actions/cache@v3
+ uses: actions/cache@v4
with:
path: |
~/.m2/repository
@@ -34,14 +39,14 @@ jobs:
restore-keys: |
maven-repo-
- name: Setup java
- uses: actions/setup-java@v3
+ uses: actions/setup-java@v4
with:
distribution: 'temurin'
java-version: 8
- name: Run a full build
run: ./dev-support/checks/build.sh -Prelease assembly:single
- name: Store binaries for tests
- uses: actions/upload-artifact@v3
+ uses: actions/upload-artifact@v4
with:
name: ratis-bin
path: |
@@ -49,7 +54,7 @@ jobs:
!ratis-assembly/target/apache-ratis-*-src.tar.gz
retention-days: 1
- name: Store source tarball for compilation
- uses: actions/upload-artifact@v3
+ uses: actions/upload-artifact@v4
with:
name: ratis-src
path: ratis-assembly/target/apache-ratis-*-src.tar.gz
@@ -58,20 +63,21 @@ jobs:
needs:
- build
runs-on: ubuntu-20.04
+ timeout-minutes: 30
strategy:
matrix:
java: [ 11 ]
fail-fast: false
steps:
- name: Download source tarball
- uses: actions/download-artifact@v3
+ uses: actions/download-artifact@v4
with:
name: ratis-src
- name: Untar sources
run: |
tar --strip-components 1 -xzvf apache-ratis-*-src.tar.gz
- name: Cache for maven dependencies
- uses: actions/cache/restore@v3
+ uses: actions/cache/restore@v4
with:
path: |
~/.m2/repository
@@ -80,7 +86,7 @@ jobs:
restore-keys: |
maven-repo-
- name: Setup java
- uses: actions/setup-java@v3
+ uses: actions/setup-java@v4
with:
distribution: 'temurin'
java-version: ${{ matrix.java }}
@@ -89,11 +95,12 @@ jobs:
rat:
name: rat
runs-on: ubuntu-20.04
+ timeout-minutes: 15
steps:
- name: Checkout project
- uses: actions/checkout@v3
+ uses: actions/checkout@v4
- name: Cache for maven dependencies
- uses: actions/cache/restore@v3
+ uses: actions/cache/restore@v4
with:
path: |
~/.m2/repository
@@ -104,7 +111,7 @@ jobs:
- name: Run tests
run: ./dev-support/checks/rat.sh
- name: Upload results
- uses: actions/upload-artifact@v3
+ uses: actions/upload-artifact@v4
if: always()
with:
name: rat
@@ -112,13 +119,14 @@ jobs:
author:
name: author
runs-on: ubuntu-20.04
+ timeout-minutes: 15
steps:
- name: Checkout project
- uses: actions/checkout@v3
+ uses: actions/checkout@v4
- name: Run tests
run: ./dev-support/checks/author.sh
- name: Upload results
- uses: actions/upload-artifact@v3
+ uses: actions/upload-artifact@v4
if: always()
with:
name: author
@@ -126,6 +134,7 @@ jobs:
unit:
name: unit
runs-on: ubuntu-20.04
+ timeout-minutes: 60
strategy:
matrix:
profile:
@@ -140,9 +149,9 @@ jobs:
echo -e "$(ip addr show eth0 | grep "inet\b" | awk '{print $2}' | cut -d/ -f1)\t$(hostname -f) $(hostname -s)" | sudo tee -a /etc/hosts
# REMOVE CODE ABOVE WHEN ISSUE IS ADDRESSED!
- name: Checkout project
- uses: actions/checkout@v3
+ uses: actions/checkout@v4
- name: Cache for maven dependencies
- uses: actions/cache/restore@v3
+ uses: actions/cache/restore@v4
with:
path: |
~/.m2/repository
@@ -151,7 +160,7 @@ jobs:
restore-keys: |
maven-repo-
- name: Setup java
- uses: actions/setup-java@v3
+ uses: actions/setup-java@v4
with:
distribution: 'temurin'
java-version: 8
@@ -161,7 +170,7 @@ jobs:
run: cat target/${{ github.job }}/summary.txt
if: ${{ !cancelled() }}
- name: Upload results
- uses: actions/upload-artifact@v3
+ uses: actions/upload-artifact@v4
if: ${{ !cancelled() }}
with:
name: unit-${{ matrix.profile }}
@@ -169,11 +178,12 @@ jobs:
checkstyle:
name: checkstyle
runs-on: ubuntu-20.04
+ timeout-minutes: 15
steps:
- name: Checkout project
- uses: actions/checkout@v3
+ uses: actions/checkout@v4
- name: Cache for maven dependencies
- uses: actions/cache/restore@v3
+ uses: actions/cache/restore@v4
with:
path: |
~/.m2/repository
@@ -184,7 +194,7 @@ jobs:
- name: Run tests
run: ./dev-support/checks/checkstyle.sh
- name: Upload results
- uses: actions/upload-artifact@v3
+ uses: actions/upload-artifact@v4
if: always()
with:
name: checkstyle
@@ -192,16 +202,17 @@ jobs:
findbugs:
name: findbugs
runs-on: ubuntu-20.04
+ timeout-minutes: 30
steps:
- name: Setup java
- uses: actions/setup-java@v3
+ uses: actions/setup-java@v4
with:
distribution: 'temurin'
java-version: 8
- name: Checkout project
- uses: actions/checkout@v3
+ uses: actions/checkout@v4
- name: Cache for maven dependencies
- uses: actions/cache/restore@v3
+ uses: actions/cache/restore@v4
with:
path: |
~/.m2/repository
@@ -212,7 +223,7 @@ jobs:
- name: Run tests
run: ./dev-support/checks/findbugs.sh
- name: Upload results
- uses: actions/upload-artifact@v3
+ uses: actions/upload-artifact@v4
if: always()
with:
name: findbugs
@@ -222,14 +233,15 @@ jobs:
- build
- unit
runs-on: ubuntu-20.04
+ timeout-minutes: 30
if: (github.repository == 'apache/ratis' || github.repository == 'apache/incubator-ratis') && github.event_name != 'pull_request'
steps:
- name: Checkout project
- uses: actions/checkout@v3
+ uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Cache for maven dependencies
- uses: actions/cache/restore@v3
+ uses: actions/cache/restore@v4
with:
path: |
~/.m2/repository
@@ -238,12 +250,12 @@ jobs:
restore-keys: |
maven-repo-
- name: Setup java 17
- uses: actions/setup-java@v3
+ uses: actions/setup-java@v4
with:
distribution: 'temurin'
java-version: 17
- name: Download artifacts
- uses: actions/download-artifact@v3
+ uses: actions/download-artifact@v4
with:
path: target/artifacts
- name: Untar binaries
@@ -258,7 +270,7 @@ jobs:
SONAR_TOKEN: ${{ secrets.SONARCLOUD_TOKEN }}
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Archive build results
- uses: actions/upload-artifact@v3
+ uses: actions/upload-artifact@v4
if: always()
with:
name: ${{ github.job }}
diff --git a/.github/workflows/repeat-test.yml b/.github/workflows/repeat-test.yml
index 86150d2598..e3c05bec6a 100644
--- a/.github/workflows/repeat-test.yml
+++ b/.github/workflows/repeat-test.yml
@@ -87,11 +87,11 @@ jobs:
split: ${{ fromJson(needs.prepare.outputs.matrix) }}
fail-fast: ${{ fromJson(github.event.inputs.fail-fast) }}
steps:
- - uses: actions/checkout@v3
+ - uses: actions/checkout@v4
with:
ref: ${{ github.event.inputs.ref }}
- name: Cache for maven dependencies
- uses: actions/cache@v3
+ uses: actions/cache@v4
with:
path: |
~/.m2/repository
@@ -100,7 +100,7 @@ jobs:
restore-keys: |
maven-repo-
- name: Setup java
- uses: actions/setup-java@v3
+ uses: actions/setup-java@v4
with:
distribution: 'temurin'
java-version: 8
@@ -113,7 +113,7 @@ jobs:
run: dev-support/checks/_summary.sh target/unit/summary.txt
if: ${{ !cancelled() }}
- name: Archive build results
- uses: actions/upload-artifact@v3
+ uses: actions/upload-artifact@v4
if: always()
with:
name: result-${{ env.TEST_CLASS }}-split-${{ matrix.split }}
@@ -124,7 +124,7 @@ jobs:
runs-on: ubuntu-20.04
steps:
- name: Download build results
- uses: actions/download-artifact@v3
+ uses: actions/download-artifact@v4
- name: Count failures
run: |
failures=$(find . -name 'summary.txt' | grep -v 'iteration' | xargs grep -v 'exit code: 0' | wc -l)
diff --git a/pom.xml b/pom.xml
index 68488077d4..0130258b18 100644
--- a/pom.xml
+++ b/pom.xml
@@ -176,7 +176,7 @@
3.3.04.0.61.6.1
- 3.0.0
+ 3.0.0-M43.5.3
@@ -440,14 +440,6 @@
6.0.53provided
-
-
- com.github.spotbugs
- spotbugs-annotations
- ${spotbugs.version}
- provided
- true
-
@@ -642,6 +634,7 @@
falsefalsefalse
+ all600-Xmx2048m -XX:+HeapDumpOnOutOfMemoryError @{argLine}
diff --git a/ratis-assembly/src/main/resources/NOTICE b/ratis-assembly/src/main/resources/NOTICE
index 9bc9242c6a..0e3c94434b 100644
--- a/ratis-assembly/src/main/resources/NOTICE
+++ b/ratis-assembly/src/main/resources/NOTICE
@@ -292,27 +292,5 @@ networking library, which can be obtained at:
* HOMEPAGE:
* https://netty.io
* LOCATION_IN_GRPC:
-* netty/third_party/netty
------------------------------------------------------------------------
-The JSR-305 reference implementation (jsr305.jar) is distributed under the terms of the New BSD:
-
-Redistribution and use in source and binary forms, with or without modification,
-are permitted provided that the following conditions are met:
-
-1. Redistributions of source code must retain the above copyright notice,
-this list of conditions and the following disclaimer.
-
-2. Redistributions in binary form must reproduce the above copyright notice,
-this list of conditions and the following disclaimer in the documentation and/or
-other materials provided with the distribution.
-
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS
-OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
-AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
-CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
-DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
-LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
-LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
+ * netty/third_party/netty
-----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/SnapshotManagementApi.java b/ratis-client/src/main/java/org/apache/ratis/client/api/SnapshotManagementApi.java
index edd0475442..f83d976040 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/SnapshotManagementApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/SnapshotManagementApi.java
@@ -27,6 +27,24 @@
*/
public interface SnapshotManagementApi {
- /** trigger create snapshot file. */
- RaftClientReply create(long timeoutMs) throws IOException;
+ /** The same as create(0, timeoutMs). */
+ default RaftClientReply create(long timeoutMs) throws IOException {
+ return create(0, timeoutMs);
+ }
+
+ /** The same as create(force? 1 : 0, timeoutMs). */
+ default RaftClientReply create(boolean force, long timeoutMs) throws IOException {
+ return create(force? 1 : 0, timeoutMs);
+ }
+
+ /**
+ * Trigger to create a snapshot.
+ *
+ * @param creationGap When (creationGap > 0) and (astAppliedIndex - lastSnapshotIndex < creationGap),
+ * return lastSnapshotIndex; otherwise, take a new snapshot and then return its index.
+ * When creationGap == 0, use the server configured value as the creationGap.
+ * @return a reply. When {@link RaftClientReply#isSuccess()} is true,
+ * {@link RaftClientReply#getLogIndex()} is the snapshot index fulfilling the operation.
+ */
+ RaftClientReply create(long creationGap, long timeoutMs) throws IOException;
}
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
index 4be9fa3275..76987801ba 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
@@ -119,16 +119,18 @@ public RaftClientRequest newRequestImpl() {
ioe = e;
}
- pending.incrementExceptionCount(ioe);
- ClientRetryEvent event = new ClientRetryEvent(request, ioe, pending);
+ if (client.isClosed()) {
+ throw new AlreadyClosedException(this + " is closed.");
+ }
+
+ final ClientRetryEvent event = pending.newClientRetryEvent(request, ioe);
final RetryPolicy retryPolicy = client.getRetryPolicy();
final RetryPolicy.Action action = retryPolicy.handleAttemptFailure(event);
- TimeDuration sleepTime = client.getEffectiveSleepTime(ioe, action.getSleepTime());
-
if (!action.shouldRetry()) {
- throw (IOException)client.noMoreRetries(event);
+ throw client.noMoreRetries(event);
}
+ final TimeDuration sleepTime = client.getEffectiveSleepTime(ioe, action.getSleepTime());
try {
sleepTime.sleep();
} catch (InterruptedException e) {
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index db19831955..cab9606a0e 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -364,6 +364,7 @@ static GroupInfoReplyProto toGroupInfoReplyProto(GroupInfoReply reply) {
b.setIsRaftStorageHealthy(reply.isRaftStorageHealthy());
b.setRole(reply.getRoleInfoProto());
b.addAllCommitInfos(reply.getCommitInfos());
+ b.setLogInfo(reply.getLogInfoProto());
}
}
return b.build();
@@ -506,7 +507,8 @@ static GroupInfoReply toGroupInfoReply(GroupInfoReplyProto replyProto) {
ProtoUtils.toRaftGroup(replyProto.getGroup()),
replyProto.getRole(),
replyProto.getIsRaftStorageHealthy(),
- replyProto.hasConf()? replyProto.getConf(): null);
+ replyProto.hasConf()? replyProto.getConf(): null,
+ replyProto.getLogInfo());
}
static Message toMessage(final ClientMessageEntryProto p) {
@@ -657,7 +659,8 @@ static SnapshotManagementRequest toSnapshotManagementRequest(SnapshotManagementR
switch(p.getOpCase()) {
case CREATE:
return SnapshotManagementRequest.newCreate(clientId, serverId,
- ProtoUtils.toRaftGroupId(m.getRaftGroupId()), m.getCallId(), m.getTimeoutMs());
+ ProtoUtils.toRaftGroupId(m.getRaftGroupId()), m.getCallId(), m.getTimeoutMs(),
+ p.getCreate().getCreationGap());
default:
throw new IllegalArgumentException("Unexpected op " + p.getOpCase() + " in " + p);
}
@@ -669,7 +672,7 @@ static SnapshotManagementRequestProto toSnapshotManagementRequestProto(
.setRpcRequest(toRaftRpcRequestProtoBuilder(request));
final SnapshotManagementRequest.Create create = request.getCreate();
if (create != null) {
- b.setCreate(SnapshotCreateRequestProto.newBuilder().build());
+ b.setCreate(SnapshotCreateRequestProto.newBuilder().setCreationGap(create.getCreationGap()).build());
}
return b.build();
}
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 26d01c356f..ba91866d71 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -40,6 +40,7 @@
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.rpc.CallId;
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.protocol.*;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@@ -169,6 +170,10 @@ private CompletableFuture writeAsyncImpl(Object data, long leng
return f;
}
+ public CompletableFuture writeAsync(ByteBuf src, Iterable options) {
+ return writeAsyncImpl(src, src.readableBytes(), options);
+ }
+
@Override
public CompletableFuture writeAsync(ByteBuffer src, Iterable options) {
return writeAsyncImpl(src, src.remaining(), options);
@@ -235,7 +240,7 @@ public DataStreamClientRpc getClientRpc() {
}
@Override
- public DataStreamOutputRpc stream(RaftClientRequest request) {
+ public DataStreamOutputImpl stream(RaftClientRequest request) {
return new DataStreamOutputImpl(request);
}
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
index a1aa58681c..09c6cd4ac9 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
@@ -32,6 +32,7 @@
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.rpc.CallId;
+import org.apache.ratis.util.BatchLogger;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
@@ -50,6 +51,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongFunction;
@@ -57,6 +59,10 @@
public final class OrderedAsync {
public static final Logger LOG = LoggerFactory.getLogger(OrderedAsync.class);
+ private enum BatchLogKey implements BatchLogger.Key {
+ SEND_REQUEST_EXCEPTION
+ }
+
static class PendingOrderedRequest extends PendingClientRequest
implements SlidingWindow.ClientSideRequest {
private final long callId;
@@ -149,10 +155,6 @@ private void failAllAsyncRequests(RaftClientRequest request, Throwable t) {
getSlidingWindow(request).fail(request.getSlidingWindowEntry().getSeqNum(), t);
}
- private void handleAsyncRetryFailure(ClientRetryEvent event) {
- failAllAsyncRequests(event.getRequest(), client.noMoreRetries(event));
- }
-
CompletableFuture send(RaftClientRequest.Type type, Message message, RaftPeerId server) {
if (!type.is(TypeCase.WATCH) && !type.is(TypeCase.MESSAGESTREAM)) {
Objects.requireNonNull(message, "message == null");
@@ -187,85 +189,71 @@ private void sendRequestWithRetry(PendingOrderedRequest pending) {
if (pending == null) {
return;
}
-
- final CompletableFuture f = pending.getReplyFuture();
- if (f.isDone()) {
+ if (pending.getReplyFuture().isDone()) {
return;
}
- final RaftClientRequest request = pending.newRequestImpl();
+ final RaftClientRequest request = pending.newRequest();
if (request == null) { // already done
- LOG.debug("{} newRequestImpl returns null", pending);
+ LOG.debug("{} newRequest returns null", pending);
return;
}
- final RetryPolicy retryPolicy = client.getRetryPolicy();
- sendRequest(pending).exceptionally(e -> {
- if (e instanceof CompletionException) {
- e = JavaUtils.unwrapCompletionException(e);
- scheduleWithTimeout(pending, request, retryPolicy, e);
- return null;
- }
- f.completeExceptionally(e);
- return null;
- });
- }
-
- private void scheduleWithTimeout(PendingOrderedRequest pending,
- RaftClientRequest request, RetryPolicy retryPolicy, Throwable e) {
- final int attempt = pending.getAttemptCount();
- final ClientRetryEvent event = new ClientRetryEvent(request, e, pending);
- final TimeDuration sleepTime = client.getEffectiveSleepTime(e,
- retryPolicy.handleAttemptFailure(event).getSleepTime());
- LOG.debug("schedule* attempt #{} with sleep {} and policy {} for {}", attempt, sleepTime, retryPolicy, request);
- scheduleWithTimeout(pending, sleepTime, getSlidingWindow(request));
- }
-
- private void scheduleWithTimeout(PendingOrderedRequest pending, TimeDuration sleepTime,
- SlidingWindow.Client slidingWindow) {
- client.getScheduler().onTimeout(sleepTime,
- () -> slidingWindow.retry(pending, this::sendRequestWithRetry),
- LOG, () -> "Failed* to retry " + pending);
- }
-
- private CompletableFuture sendRequest(PendingOrderedRequest pending) {
- final RetryPolicy retryPolicy = client.getRetryPolicy();
- final RaftClientRequest request;
if (getSlidingWindow((RaftPeerId) null).isFirst(pending.getSeqNum())) {
pending.setFirstRequest();
}
- request = pending.newRequest();
LOG.debug("{}: send* {}", client.getId(), request);
- return client.getClientRpc().sendRequestAsync(request).thenApply(reply -> {
+ client.getClientRpc().sendRequestAsync(request).thenAccept(reply -> {
LOG.debug("{}: receive* {}", client.getId(), reply);
Objects.requireNonNull(reply, "reply == null");
client.handleReply(request, reply);
getSlidingWindow(request).receiveReply(
request.getSlidingWindowEntry().getSeqNum(), reply, this::sendRequestWithRetry);
- return reply;
}).exceptionally(e -> {
- LOG.error(client.getId() + ": Failed* " + request, e);
- e = JavaUtils.unwrapCompletionException(e);
- if (e instanceof IOException && !(e instanceof GroupMismatchException)) {
- pending.incrementExceptionCount(e);
- final ClientRetryEvent event = new ClientRetryEvent(request, e, pending);
- if (!retryPolicy.handleAttemptFailure(event).shouldRetry()) {
- handleAsyncRetryFailure(event);
- } else {
- if (e instanceof NotLeaderException) {
- NotLeaderException nle = (NotLeaderException)e;
- client.handleNotLeaderException(request, nle, this::resetSlidingWindow);
- } else {
- client.handleIOException(request, (IOException) e, null, this::resetSlidingWindow);
- }
- }
- throw new CompletionException(e);
- }
- failAllAsyncRequests(request, e);
+ final Throwable exception = e;
+ final String key = client.getId() + "-" + request.getCallId() + "-" + exception;
+ final Consumer op = suffix -> LOG.error("{} {}: Failed* {}", suffix, client.getId(), request, exception);
+ BatchLogger.warn(BatchLogKey.SEND_REQUEST_EXCEPTION, key, op);
+ handleException(pending, request, e);
return null;
});
}
+ private void handleException(PendingOrderedRequest pending, RaftClientRequest request, Throwable e) {
+ final RetryPolicy retryPolicy = client.getRetryPolicy();
+ if (client.isClosed()) {
+ failAllAsyncRequests(request, new AlreadyClosedException(client + " is closed."));
+ return;
+ }
+
+ e = JavaUtils.unwrapCompletionException(e);
+ if (!(e instanceof IOException) || e instanceof GroupMismatchException) {
+ // non-retryable exceptions
+ failAllAsyncRequests(request, e);
+ return;
+ }
+
+ final ClientRetryEvent event = pending.newClientRetryEvent(request, e);
+ final RetryPolicy.Action action = retryPolicy.handleAttemptFailure(event);
+ if (!action.shouldRetry()) {
+ failAllAsyncRequests(request, client.noMoreRetries(event));
+ return;
+ }
+
+ if (e instanceof NotLeaderException) {
+ client.handleNotLeaderException(request, (NotLeaderException) e, this::resetSlidingWindow);
+ } else {
+ client.handleIOException(request, (IOException) e, null, this::resetSlidingWindow);
+ }
+ final TimeDuration sleepTime = client.getEffectiveSleepTime(e, action.getSleepTime());
+ LOG.debug("schedule* retry with sleep {} for attempt #{} of {}, {}",
+ sleepTime, event.getAttemptCount(), request, retryPolicy);
+ final SlidingWindow.Client slidingWindow = getSlidingWindow(request);
+ client.getScheduler().onTimeout(sleepTime,
+ () -> slidingWindow.retry(pending, this::sendRequestWithRetry),
+ LOG, () -> "Failed* to retry " + pending);
+ }
+
void assertRequestSemaphore(int expectedAvailablePermits, int expectedQueueLength) {
Preconditions.assertSame(expectedAvailablePermits, requestSemaphore.availablePermits(), "availablePermits");
Preconditions.assertSame(expectedQueueLength, requestSemaphore.getQueueLength(), "queueLength");
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
index 989c00cbbc..275755514f 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
@@ -21,12 +21,14 @@
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamPacketByteBuffer;
+import org.apache.ratis.datastream.impl.DataStreamRequestByteBuf;
import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
import org.apache.ratis.datastream.impl.DataStreamRequestFilePositionCount;
import org.apache.ratis.io.FilePositionCount;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.DataStreamRequest;
import org.apache.ratis.protocol.DataStreamRequestHeader;
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.SlidingWindow;
@@ -56,6 +58,8 @@ static class DataStreamWindowRequest implements SlidingWindow.ClientSideRequest<
DataStreamRequest getDataStreamRequest() {
if (header.getDataLength() == 0) {
return new DataStreamRequestByteBuffer(header, DataStreamPacketByteBuffer.EMPTY_BYTE_BUFFER);
+ } else if (data instanceof ByteBuf) {
+ return new DataStreamRequestByteBuf(header, (ByteBuf)data);
} else if (data instanceof ByteBuffer) {
return new DataStreamRequestByteBuffer(header, (ByteBuffer)data);
} else if (data instanceof FilePositionCount) {
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index ec16763c2c..db789aef2f 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -44,11 +44,13 @@
import org.apache.ratis.thirdparty.com.google.common.cache.Cache;
import org.apache.ratis.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.ratis.util.CollectionUtils;
+import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.TimeoutExecutor;
+import org.apache.ratis.util.Timestamp;
import java.io.IOException;
import java.util.ArrayList;
@@ -65,6 +67,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
@@ -79,16 +82,18 @@ public final class RaftClientImpl implements RaftClient {
.build();
public abstract static class PendingClientRequest {
- private final long creationTimeInMs = System.currentTimeMillis();
+ private final Timestamp creationTime = Timestamp.currentTime();
private final CompletableFuture replyFuture = new CompletableFuture<>();
private final AtomicInteger attemptCount = new AtomicInteger();
- private final Map, Integer> exceptionCount = new ConcurrentHashMap<>();
+ private final Map, Integer> exceptionCounts = new ConcurrentHashMap<>();
public abstract RaftClientRequest newRequestImpl();
final RaftClientRequest newRequest() {
- attemptCount.incrementAndGet();
- return newRequestImpl();
+ final int attempt = attemptCount.incrementAndGet();
+ final RaftClientRequest request = newRequestImpl();
+ LOG.debug("attempt #{}, newRequest {}", attempt, request);
+ return request;
}
CompletableFuture getReplyFuture() {
@@ -99,19 +104,10 @@ public int getAttemptCount() {
return attemptCount.get();
}
- int incrementExceptionCount(Throwable t) {
- return t != null ? exceptionCount.compute(t.getClass(), (k, v) -> v != null ? v + 1 : 1) : 0;
- }
-
- public int getExceptionCount(Throwable t) {
- return t != null ? Optional.ofNullable(exceptionCount.get(t.getClass())).orElse(0) : 0;
- }
-
- public boolean isRequestTimeout(TimeDuration timeout) {
- if (timeout == null) {
- return false;
- }
- return System.currentTimeMillis() - creationTimeInMs > timeout.toLong(TimeUnit.MILLISECONDS);
+ public ClientRetryEvent newClientRetryEvent(RaftClientRequest request, Throwable throwable) {
+ final int exceptionCount = throwable == null? 0
+ : exceptionCounts.compute(throwable.getClass(), (k, v) -> v == null? 1: v+1);
+ return new ClientRetryEvent(getAttemptCount(), request, exceptionCount, throwable, creationTime);
}
}
@@ -176,6 +172,7 @@ private synchronized Set getAndReset() {
private final RaftGroupId groupId;
private final RetryPolicy retryPolicy;
+ @SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
private volatile RaftPeerId leaderId;
/** The callIds of the replied requests. */
private final RepliedCallIds repliedCallIds;
@@ -194,6 +191,8 @@ private synchronized Set getAndReset() {
private final ConcurrentMap
leaderElectionManagement = new ConcurrentHashMap<>();
+ private final AtomicBoolean closed = new AtomicBoolean();
+
@SuppressWarnings("checkstyle:ParameterNumber")
RaftClientImpl(ClientId clientId, RaftGroup group, RaftPeerId leaderId, RaftPeer primaryDataStreamServer,
RaftClientRpc clientRpc, RetryPolicy retryPolicy, RaftProperties properties, Parameters parameters) {
@@ -344,11 +343,11 @@ public DataStreamApi getDataStreamApi() {
return dataStreamApi.get();
}
- Throwable noMoreRetries(ClientRetryEvent event) {
+ IOException noMoreRetries(ClientRetryEvent event) {
final int attemptCount = event.getAttemptCount();
final Throwable throwable = event.getCause();
if (attemptCount == 1 && throwable != null) {
- return throwable;
+ return IOUtils.asIOException(throwable);
}
return new RaftRetryFailureException(event.getRequest(), attemptCount, retryPolicy, throwable);
}
@@ -416,8 +415,7 @@ void handleIOException(RaftClientRequest request, IOException ioe) {
void handleIOException(RaftClientRequest request, IOException ioe,
RaftPeerId newLeader, Consumer handler) {
- LOG.debug("{}: suggested new leader: {}. Failed {} with {}",
- clientId, newLeader, request, ioe);
+ LOG.debug("{}: suggested new leader: {}. Failed {}", clientId, newLeader, request, ioe);
if (LOG.isTraceEnabled()) {
LOG.trace("Stack trace", new Throwable("TRACE"));
}
@@ -454,8 +452,17 @@ public RaftClientRpc getClientRpc() {
return clientRpc;
}
+ boolean isClosed() {
+ return closed.get();
+ }
+
@Override
public void close() throws IOException {
+ if (!closed.compareAndSet(false, true)) {
+ return;
+ }
+
+ LOG.debug("close {}", getId());
clientRpc.close();
if (dataStreamApi.isInitialized()) {
dataStreamApi.get().close();
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/SnapshotManagementImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/SnapshotManagementImpl.java
index 1762dc0e49..65c54d0f21 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/SnapshotManagementImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/SnapshotManagementImpl.java
@@ -37,9 +37,10 @@ class SnapshotManagementImpl implements SnapshotManagementApi {
}
@Override
- public RaftClientReply create(long timeoutMs) throws IOException {
+ public RaftClientReply create(long creationGap, long timeoutMs) throws IOException {
final long callId = CallId.getAndIncrement();
return client.io().sendRequestWithRetry(() -> SnapshotManagementRequest.newCreate(client.getId(),
- Optional.ofNullable(server).orElseGet(client::getLeaderId), client.getGroupId(), callId, timeoutMs));
+ Optional.ofNullable(server).orElseGet(client::getLeaderId),
+ client.getGroupId(), callId, timeoutMs, creationGap));
}
}
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
index 84b817b581..eccda4dbdd 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
@@ -22,6 +22,7 @@
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.protocol.exceptions.GroupMismatchException;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.protocol.RaftClientReply;
@@ -89,11 +90,14 @@ static void sendRequestWithRetry(PendingClientRequest pending, RaftClientImpl cl
}
final Throwable cause = replyException != null ? replyException : e;
- pending.incrementExceptionCount(cause);
- final ClientRetryEvent event = new ClientRetryEvent(request, cause, pending);
+ if (client.isClosed()) {
+ f.completeExceptionally(new AlreadyClosedException(client + " is closed"));
+ return;
+ }
+
+ final ClientRetryEvent event = pending.newClientRetryEvent(request, cause);
RetryPolicy retryPolicy = client.getRetryPolicy();
final RetryPolicy.Action action = retryPolicy.handleAttemptFailure(event);
- TimeDuration sleepTime = client.getEffectiveSleepTime(cause, action.getSleepTime());
if (!action.shouldRetry()) {
f.completeExceptionally(client.noMoreRetries(event));
return;
@@ -124,7 +128,9 @@ static void sendRequestWithRetry(PendingClientRequest pending, RaftClientImpl cl
}
}
- LOG.debug("schedule retry for attempt #{}, policy={}, request={}", attemptCount, retryPolicy, request);
+ final TimeDuration sleepTime = client.getEffectiveSleepTime(cause, action.getSleepTime());
+ LOG.debug("schedule~ attempt #{} with sleep {} and policy {} for {}",
+ attemptCount, sleepTime, retryPolicy, request);
client.getScheduler().onTimeout(sleepTime,
() -> sendRequestWithRetry(pending, client), LOG, () -> clientId + ": Failed~ to retry " + request);
} catch (Exception ex) {
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/retry/ClientRetryEvent.java b/ratis-client/src/main/java/org/apache/ratis/client/retry/ClientRetryEvent.java
index f0c38efb96..c6a8beb06f 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/retry/ClientRetryEvent.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/retry/ClientRetryEvent.java
@@ -17,12 +17,11 @@
*/
package org.apache.ratis.client.retry;
-import org.apache.ratis.client.impl.RaftClientImpl.PendingClientRequest;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.retry.RetryPolicy;
-import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.Timestamp;
/** An {@link RetryPolicy.Event} specific to client request failure. */
public class ClientRetryEvent implements RetryPolicy.Event {
@@ -30,23 +29,15 @@ public class ClientRetryEvent implements RetryPolicy.Event {
private final int causeCount;
private final RaftClientRequest request;
private final Throwable cause;
- private PendingClientRequest pending;
+ private final Timestamp pendingRequestCreationTime;
- @VisibleForTesting
- public ClientRetryEvent(int attemptCount, RaftClientRequest request, Throwable cause) {
- this(attemptCount, request, attemptCount, cause);
- }
-
- public ClientRetryEvent(RaftClientRequest request, Throwable t, PendingClientRequest pending) {
- this(pending.getAttemptCount(), request, pending.getExceptionCount(t), t);
- this.pending = pending;
- }
-
- private ClientRetryEvent(int attemptCount, RaftClientRequest request, int causeCount, Throwable cause) {
+ public ClientRetryEvent(int attemptCount, RaftClientRequest request, int causeCount, Throwable cause,
+ Timestamp pendingRequestCreationTime) {
this.attemptCount = attemptCount;
this.causeCount = causeCount;
this.request = request;
this.cause = cause;
+ this.pendingRequestCreationTime = pendingRequestCreationTime;
}
@Override
@@ -69,7 +60,7 @@ public Throwable getCause() {
}
boolean isRequestTimeout(TimeDuration timeout) {
- return pending != null && pending.isRequestTimeout(timeout);
+ return timeout != null && pendingRequestCreationTime.elapsedTime().compareTo(timeout) >= 0;
}
@Override
@@ -77,6 +68,7 @@ public String toString() {
return JavaUtils.getClassSimpleName(getClass())
+ ":attempt=" + attemptCount
+ ",request=" + request
- + ",cause=" + cause;
+ + ",cause=" + cause
+ + ",causeCount=" + causeCount;
}
}
diff --git a/ratis-common/pom.xml b/ratis-common/pom.xml
index 9bb36bcce5..9205e81c2c 100644
--- a/ratis-common/pom.xml
+++ b/ratis-common/pom.xml
@@ -69,12 +69,5 @@
junit-jupiter-paramstest
-
-
- com.github.spotbugs
- spotbugs-annotations
- provided
- true
-
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuf.java
similarity index 96%
rename from ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
rename to ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuf.java
index 2542b1ec6f..1873bec9b4 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
+++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuf.java
@@ -16,9 +16,8 @@
* limitations under the License.
*/
-package org.apache.ratis.netty.server;
+package org.apache.ratis.datastream.impl;
-import org.apache.ratis.datastream.impl.DataStreamPacketImpl;
import org.apache.ratis.io.WriteOption;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import org.apache.ratis.protocol.ClientId;
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoReply.java
index 632fa65293..bfac81a2b0 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoReply.java
@@ -19,6 +19,7 @@
import org.apache.ratis.proto.RaftProtos.RaftConfigurationProto;
import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
+import org.apache.ratis.proto.RaftProtos.LogInfoProto;
import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
import java.util.Collection;
@@ -33,25 +34,27 @@ public class GroupInfoReply extends RaftClientReply {
private final RoleInfoProto roleInfoProto;
private final boolean isRaftStorageHealthy;
private final RaftConfigurationProto conf;
+ private final LogInfoProto logInfoProto;
public GroupInfoReply(RaftClientRequest request, Collection commitInfos,
RaftGroup group, RoleInfoProto roleInfoProto, boolean isRaftStorageHealthy,
- RaftConfigurationProto conf) {
+ RaftConfigurationProto conf, LogInfoProto logInfoProto) {
this(request.getClientId(), request.getServerId(), request.getRaftGroupId(),
request.getCallId(), commitInfos,
- group, roleInfoProto, isRaftStorageHealthy, conf);
+ group, roleInfoProto, isRaftStorageHealthy, conf, logInfoProto);
}
@SuppressWarnings("parameternumber")
public GroupInfoReply(ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId,
Collection commitInfos,
RaftGroup group, RoleInfoProto roleInfoProto, boolean isRaftStorageHealthy,
- RaftConfigurationProto conf) {
+ RaftConfigurationProto conf, LogInfoProto logInfoProto) {
super(clientId, serverId, groupId, callId, true, null, null, 0L, commitInfos);
this.group = group;
this.roleInfoProto = roleInfoProto;
this.isRaftStorageHealthy = isRaftStorageHealthy;
this.conf = conf;
+ this.logInfoProto = logInfoProto;
}
public RaftGroup getGroup() {
@@ -69,4 +72,8 @@ public boolean isRaftStorageHealthy() {
public Optional getConf() {
return Optional.ofNullable(conf);
}
+
+ public LogInfoProto getLogInfoProto() {
+ return logInfoProto;
+ }
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java
index 1985bbe667..222ccff057 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java
@@ -46,11 +46,11 @@ default CompletableFuture submitClientRequestAsync(
ReferenceCountedObject requestRef) {
try {
// for backward compatibility
- return submitClientRequestAsync(requestRef.retain())
- .whenComplete((r, e) -> requestRef.release());
+ return submitClientRequestAsync(requestRef.retain());
} catch (Exception e) {
- requestRef.release();
return JavaUtils.completeExceptionally(e);
+ } finally {
+ requestRef.release();
}
}
}
\ No newline at end of file
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index ed41f1ea2c..18c157130b 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -474,7 +474,13 @@ public long getTimeoutMs() {
@Override
public String toString() {
- return super.toString() + ", seq=" + ProtoUtils.toString(slidingWindowEntry) + ", "
- + type + ", " + getMessage();
+ return toStringShort() + ", " + getMessage();
+ }
+
+ /**
+ * @return a short string which does not include {@link #message}.
+ */
+ public String toStringShort() {
+ return super.toString() + ", seq=" + ProtoUtils.toString(slidingWindowEntry) + ", " + type;
}
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java
index e7bfee3cd9..65a77855c4 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java
@@ -254,7 +254,9 @@ public String getDetails() {
@Override
public boolean equals(Object o) {
- return (o instanceof RaftPeer) && id.equals(((RaftPeer) o).getId());
+ return (o instanceof RaftPeer) && id.equals(((RaftPeer) o).getId()) &&
+ address != null && ((RaftPeer) o).getAddress() != null &&
+ address.equals(((RaftPeer) o).getAddress());
}
@Override
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java
index 8098039d1b..8db842d734 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java
@@ -17,7 +17,6 @@
*/
package org.apache.ratis.protocol;
-import javax.annotation.concurrent.Immutable;
import org.apache.ratis.proto.RaftProtos.RaftPeerIdProto;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.JavaUtils;
@@ -34,7 +33,6 @@
*
* This is a value-based class.
*/
-@Immutable
public final class RaftPeerId {
private static final Map BYTE_STRING_MAP = new ConcurrentHashMap<>();
private static final Map STRING_MAP = new ConcurrentHashMap<>();
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/SnapshotManagementRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/SnapshotManagementRequest.java
index 2ea2059b51..269fdfc591 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/SnapshotManagementRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/SnapshotManagementRequest.java
@@ -24,7 +24,16 @@ public final class SnapshotManagementRequest extends RaftClientRequest {
public abstract static class Op {
}
- public static class Create extends Op {
+
+ public static final class Create extends Op {
+ private final long creationGap;
+ private Create(long creationGap) {
+ this.creationGap = creationGap;
+ }
+
+ public long getCreationGap() {
+ return creationGap;
+ }
@Override
public String toString() {
@@ -35,8 +44,13 @@ public String toString() {
public static SnapshotManagementRequest newCreate(ClientId clientId,
RaftPeerId serverId, RaftGroupId groupId, long callId, long timeoutMs) {
+ return newCreate(clientId, serverId, groupId, callId, timeoutMs, 0);
+ }
+
+ public static SnapshotManagementRequest newCreate(ClientId clientId,
+ RaftPeerId serverId, RaftGroupId groupId, long callId, long timeoutMs, long creationGap) {
return new SnapshotManagementRequest(clientId,
- serverId, groupId, callId, timeoutMs,new SnapshotManagementRequest.Create());
+ serverId, groupId, callId, timeoutMs, new SnapshotManagementRequest.Create(creationGap));
}
private final Op op;
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/BatchLogger.java b/ratis-common/src/main/java/org/apache/ratis/util/BatchLogger.java
index 9ccd66ad71..38dad5c499 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/BatchLogger.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/BatchLogger.java
@@ -35,7 +35,13 @@ public final class BatchLogger {
private BatchLogger() {
}
- public interface Key {}
+ public interface Key {
+ TimeDuration DEFAULT_DURATION = TimeDuration.valueOf(5, TimeUnit.SECONDS);
+
+ default TimeDuration getBatchDuration() {
+ return DEFAULT_DURATION;
+ }
+ }
private static final class UniqueId {
private final Key key;
@@ -93,6 +99,10 @@ private synchronized boolean tryStartBatch(Consumer op) {
private static final TimeoutExecutor SCHEDULER = TimeoutExecutor.getInstance();
private static final ConcurrentMap LOG_CACHE = new ConcurrentHashMap<>();
+ public static void warn(Key key, String name, Consumer op) {
+ warn(key, name, op, key.getBatchDuration(), true);
+ }
+
public static void warn(Key key, String name, Consumer op, TimeDuration batchDuration) {
warn(key, name, op, batchDuration, true);
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
index 00725903a7..7d1d75309a 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
@@ -229,7 +229,7 @@ static RETURN attempt(
}
if (log != null && log.isWarnEnabled()) {
log.warn("FAILED \"" + name.get() + "\", attempt #" + i + "/" + numAttempts
- + ": " + t + ", sleep " + sleepTime + " and then retry.", t);
+ + ", sleep " + sleepTime + " and then retry: " + t);
}
}
@@ -257,7 +257,6 @@ static void attemptUntilTrue(
}, numAttempts, sleepTime, name, log);
}
-
static Timer runRepeatedly(Runnable runnable, long delay, long period, TimeUnit unit) {
final Timer timer = new Timer(true);
timer.schedule(new TimerTask() {
@@ -283,6 +282,10 @@ static CompletableFuture completeExceptionally(Throwable t) {
return future;
}
+ static boolean isCompletedNormally(CompletableFuture> future) {
+ return future.isDone() && !future.isCancelled() && !future.isCompletedExceptionally();
+ }
+
static Throwable unwrapCompletionException(Throwable t) {
Objects.requireNonNull(t, "t == null");
return t instanceof CompletionException && t.getCause() != null? t.getCause(): t;
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JmxRegister.java b/ratis-common/src/main/java/org/apache/ratis/util/JmxRegister.java
index 54f7989245..4554410488 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JmxRegister.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JmxRegister.java
@@ -46,15 +46,16 @@ static ObjectName tryRegister(String name, Object mBean) {
}
/**
- * Try registering the mBean with the names one by one.
+ * Try registering the mxBean with the names one by one.
* @return the registered name, or, if it fails, return null.
*/
- public synchronized String register(Object mBean, Iterable> names) {
+ public synchronized String register(Object mxBean, Iterable> names) {
if (registeredName == null) {
for (Supplier supplier : names) {
final String name = supplier.get();
- registeredName = tryRegister(name, mBean);
+ registeredName = tryRegister(name, mxBean);
if (registeredName != null) {
+ LOG.info("register mxBean {} as {}", mxBean.getClass(), name);
return name;
}
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java b/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java
new file mode 100644
index 0000000000..82202f2884
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.ref.ReferenceQueue;
+import java.lang.ref.WeakReference;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Simple general resource leak detector using {@link ReferenceQueue} and {@link java.lang.ref.WeakReference} to
+ * observe resource object life-cycle and assert proper resource closure before they are GCed.
+ *
+ *
+ * Example usage:
+ *
+ *
{@code
+ * class MyResource implements AutoClosable {
+ * static final LeakDetector LEAK_DETECTOR = new LeakDetector("MyResource");
+ *
+ * private final UncheckedAutoCloseable leakTracker = LEAK_DETECTOR.track(this, () -> {
+ * // report leaks, don't refer to the original object (MyResource) here.
+ * System.out.println("MyResource is not closed before being discarded.");
+ * });
+ *
+ * @Override
+ * public void close() {
+ * // proper resources cleanup...
+ * // inform tracker that this object is closed properly.
+ * leakTracker.close();
+ * }
+ * }
+ *
+ * }
+ */
+public class LeakDetector {
+ private static final Logger LOG = LoggerFactory.getLogger(LeakDetector.class);
+ private static final AtomicLong COUNTER = new AtomicLong();
+
+ private final ReferenceQueue