From 20ad3ec7113ff7bfcf7644b18deed6df03c5795c Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 20 Feb 2025 10:36:37 +0800 Subject: [PATCH 1/3] fix --- .../java/org/apache/bookkeeper/proto/BookieClientImpl.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java index a12d9fd64d5..4ed3a837705 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java @@ -411,7 +411,9 @@ public void operationComplete(final int rc, PerChannelBookieClient pcbc) { try { if (rc != BKException.Code.OK) { - bookieClient.completeAdd(rc, ledgerId, entryId, addr, cb, ctx); + bookieClient.executor.executeOrdered(ledgerId, () -> { + bookieClient.completeAdd(rc, ledgerId, entryId, addr, cb, ctx); + }); } else { pcbc.addEntry(ledgerId, masterKey, entryId, toSend, cb, ctx, options, allowFastFail, writeFlags); From 1632cabc94da052700e83ca81204fbbce1fb668e Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 20 Feb 2025 18:51:23 +0800 Subject: [PATCH 2/3] improve test --- .../bookkeeper/proto/BookieClientImpl.java | 9 +- .../proto/PerChannelBookieClient.java | 8 +- .../proto/ClientSocketDisconnectTest.java | 144 ++++++++++++++++++ 3 files changed, 157 insertions(+), 4 deletions(-) create mode 100644 bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ClientSocketDisconnectTest.java diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java index 4ed3a837705..37048ac82e0 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java @@ -22,6 +22,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.protobuf.ExtensionRegistry; import io.netty.buffer.ByteBuf; @@ -370,7 +371,9 @@ private void completeBatchRead(final int rc, } } - private static class ChannelReadyForAddEntryCallback + // Without test, this class should be modifier with "private". + @VisibleForTesting + static class ChannelReadyForAddEntryCallback implements GenericCallback { private final Handle recyclerHandle; @@ -380,7 +383,9 @@ private static class ChannelReadyForAddEntryCallback private long entryId; private BookieId addr; private Object ctx; - private WriteCallback cb; + // Without test, this class should be modifier with "private". + @VisibleForTesting + WriteCallback cb; private int options; private byte[] masterKey; private boolean allowFastFail; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index a77be0ca3fe..019aeb0637f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -616,11 +616,15 @@ protected void initChannel(Channel ch) throws Exception { } ChannelFuture future = bootstrap.connect(bookieAddr); - future.addListener(contextPreservingListener(new ConnectionFutureListener(startTime))); - future.addListener(x -> makeWritable()); + addChannelListeners(future, startTime); return future; } + protected void addChannelListeners(ChannelFuture future, long connectStartTime) { + future.addListener(contextPreservingListener(new ConnectionFutureListener(connectStartTime))); + future.addListener(x -> makeWritable()); + } + void cleanDisconnectAndClose() { disconnect(); close(); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ClientSocketDisconnectTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ClientSocketDisconnectTest.java new file mode 100644 index 00000000000..2b4eb74397b --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ClientSocketDisconnectTest.java @@ -0,0 +1,144 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.proto; + +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.util.concurrent.DefaultThreadFactory; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.client.api.BookKeeper; +import org.apache.bookkeeper.client.api.DigestType; +import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.net.BookieId; +import org.apache.bookkeeper.test.BookKeeperClusterTestCase; +import org.apache.bookkeeper.tls.SecurityException; +import org.apache.bookkeeper.util.EventLoopUtil; +import org.junit.Assert; +import org.junit.jupiter.api.Test; + +@Slf4j +public class ClientSocketDisconnectTest extends BookKeeperClusterTestCase { + + public ClientSocketDisconnectTest() { + super(1); + this.useUUIDasBookieId = true; + } + + public static class PerChannelBookieClientDecorator extends PerChannelBookieClient { + + private final ThreadCounter threadCounter; + private final AtomicInteger failurePredicate = new AtomicInteger(); + + public PerChannelBookieClientDecorator(PerChannelBookieClient client, BookieId addr, ThreadCounter tCounter) + throws SecurityException { + super(client.executor, client.eventLoopGroup, addr, client.bookieAddressResolver); + this.threadCounter = tCounter; + } + + // Inject a disconnection per two connections. + protected void addChannelListeners(ChannelFuture future, long connectStartTime) { + future.addListener((ChannelFutureListener) future1 -> { + if (failurePredicate.incrementAndGet() % 2 == 1) { + future1.channel().close(); + } + }); + super.addChannelListeners(future, connectStartTime); + } + + // Records the thread who running "PendingAddOp.writeComplete". + @Override + protected void connectIfNeededAndDoOp(BookkeeperInternalCallbacks.GenericCallback op) { + BookieClientImpl.ChannelReadyForAddEntryCallback callback = + (BookieClientImpl.ChannelReadyForAddEntryCallback) op; + BookkeeperInternalCallbacks.WriteCallback originalCallback = callback.cb; + callback.cb = (rc, ledgerId, entryId, addr, ctx) -> { + threadCounter.record(); + originalCallback.writeComplete(rc, ledgerId, entryId, addr, ctx); + }; + super.connectIfNeededAndDoOp(op); + } + } + + private static class ThreadCounter { + + private final Map records = new ConcurrentHashMap<>(); + + public void record() { + Thread currentThread = Thread.currentThread(); + records.computeIfAbsent(currentThread, k -> new AtomicInteger()); + records.get(currentThread).incrementAndGet(); + } + } + + @Test + public void testAddEntriesCallbackWithBKClientThread() throws Exception { + // Create BKC and a ledger handle. + ClientConfiguration conf = new ClientConfiguration(); + conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri()); + org.apache.bookkeeper.client.BookKeeper bkc = + (org.apache.bookkeeper.client.BookKeeper) BookKeeper.newBuilder(conf) + .eventLoopGroup( + EventLoopUtil.getClientEventLoopGroup(conf, new DefaultThreadFactory("test-io"))) + .build(); + final BookieClientImpl bookieClient = (BookieClientImpl) bkc.getClientCtx().getBookieClient(); + LedgerHandle lh = (LedgerHandle) bkc.newCreateLedgerOp() + .withEnsembleSize(1) + .withWriteQuorumSize(1) + .withAckQuorumSize(1) + .withDigestType(DigestType.CRC32C) + .withPassword(new byte[0]) + .execute().join(); + + // Inject two operations. + // 1. Inject a disconnection when connecting successfully. + // 2. Records the thread who running "PendingAddOp.writeComplete". + final ThreadCounter callbackThreadRecorder = new ThreadCounter(); + List ensemble = lh.getLedgerMetadata() + .getAllEnsembles().entrySet().iterator().next().getValue(); + DefaultPerChannelBookieClientPool clientPool = + (DefaultPerChannelBookieClientPool) bookieClient.lookupClient(ensemble.get(0)); + PerChannelBookieClient[] clients = clientPool.clients; + + // Write 100 entries and wait for finishing. + for (int i = 0; i < clients.length; i++) { + clients[i] = new PerChannelBookieClientDecorator(clients[i], ensemble.get(0), callbackThreadRecorder); + } + int addCount = 1000; + CountDownLatch countDownLatch = new CountDownLatch(addCount); + for (int i = 0; i < addCount; i++) { + lh.asyncAddEntry(new byte[]{1}, (rc, lh1, entryId, ctx) -> { + countDownLatch.countDown(); + }, i); + } + countDownLatch.await(); + + // Verify: all callback will run in the "BookKeeperClientWorker" thread. + for (Thread callbackThread : callbackThreadRecorder.records.keySet()) { + Assert.assertTrue(callbackThread.getName(), callbackThread.getName().startsWith("BookKeeperClientWorker")); + } + } +} \ No newline at end of file From 61b6a445a42e8e20871a1a2f9ec23cc52fb6ddf3 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 14 May 2025 00:02:03 +0800 Subject: [PATCH 3/3] fix tests --- .../bookkeeper/proto/BookieClientImpl.java | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java index 37048ac82e0..bd04e8b5cc4 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java @@ -414,19 +414,24 @@ static ChannelReadyForAddEntryCallback create( @Override public void operationComplete(final int rc, PerChannelBookieClient pcbc) { - try { - if (rc != BKException.Code.OK) { - bookieClient.executor.executeOrdered(ledgerId, () -> { + if (rc != BKException.Code.OK) { + bookieClient.executor.executeOrdered(ledgerId, () -> { + try { bookieClient.completeAdd(rc, ledgerId, entryId, addr, cb, ctx); - }); - } else { + } finally { + ReferenceCountUtil.release(toSend); + } + recycle(); + }); + } else { + try { pcbc.addEntry(ledgerId, masterKey, entryId, toSend, cb, ctx, options, allowFastFail, writeFlags); + } finally { + ReferenceCountUtil.release(toSend); } - } finally { - ReferenceCountUtil.release(toSend); + recycle(); } - recycle(); } private ChannelReadyForAddEntryCallback(