From 05bef916c2f91019108f38934c4b333b5882edac Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 28 May 2024 14:10:33 +0300 Subject: [PATCH] Simplify the batching of V2 AddResponses - use messages as part of the pipeline to avoid ordering issues and to simplify the way how batching is implemented without the need to use synchronized (blocking) methods --- .../bookkeeper/proto/BookieProtoEncoding.java | 47 +++++++++++++------ .../proto/BookieRequestHandler.java | 31 +++--------- 2 files changed, 40 insertions(+), 38 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java index 0c3b7bf8e8e..97e6d9b2dc2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java @@ -36,6 +36,9 @@ import io.netty.util.ReferenceCountUtil; import java.io.IOException; import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.List; +import org.apache.bookkeeper.proto.BookieProtocol.AddResponse; import org.apache.bookkeeper.proto.BookieProtocol.PacketHeader; import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType; import org.apache.bookkeeper.proto.BookkeeperProtocol.Response; @@ -265,7 +268,7 @@ public ResponseEnDeCoderPreV3(ExtensionRegistry extensionRegistry) { this.extensionRegistry = extensionRegistry; } - private static final int RESPONSE_HEADERS_SIZE = 24; + public static final int RESPONSE_HEADERS_SIZE = 24; @Override public Object encode(Object msg, ByteBufAllocator allocator) @@ -334,11 +337,7 @@ public Object encode(Object msg, ByteBufAllocator allocator) } } else if (msg instanceof BookieProtocol.AddResponse) { ByteBuf buf = allocator.buffer(RESPONSE_HEADERS_SIZE + 4 /* frame size */); - buf.writeInt(RESPONSE_HEADERS_SIZE); - buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(), r.getOpCode(), (short) 0)); - buf.writeInt(r.getErrorCode()); - buf.writeLong(r.getLedgerId()); - buf.writeLong(r.getEntryId()); + serializeAddResponseInto((AddResponse) r, buf); return buf; } else if (msg instanceof BookieProtocol.AuthResponse) { BookkeeperProtocol.AuthMessage am = ((BookieProtocol.AuthResponse) r).getAuthMessage(); @@ -358,6 +357,7 @@ public Object encode(Object msg, ByteBufAllocator allocator) r.recycle(); } } + @Override public Object decode(ByteBuf buffer) throws Exception { @@ -411,12 +411,12 @@ public Object decode(ByteBuf buffer) } } - public static void serializeAddResponseInto(int rc, BookieProtocol.ParsedAddRequest req, ByteBuf buf) { - buf.writeInt(RESPONSE_HEADERS_SIZE); // Frame size - buf.writeInt(PacketHeader.toInt(req.getProtocolVersion(), req.getOpCode(), (short) 0)); - buf.writeInt(rc); // rc-code - buf.writeLong(req.getLedgerId()); - buf.writeLong(req.getEntryId()); + public static void serializeAddResponseInto(AddResponse addResponse, ByteBuf buf) { + buf.writeInt(RESPONSE_HEADERS_SIZE); + buf.writeInt(PacketHeader.toInt(addResponse.getProtocolVersion(), addResponse.getOpCode(), (short) 0)); + buf.writeInt(addResponse.getErrorCode()); + buf.writeLong(addResponse.getLedgerId()); + buf.writeLong(addResponse.getEntryId()); } } @@ -575,8 +575,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception /** * A response message encoder. */ - @Sharable public static class ResponseEncoder extends ChannelOutboundHandlerAdapter { + public static final Object MSG_FLUSH_PENDING_ADD_RESPONSES = new Object(); + private final List pendingAddResponses = new ArrayList<>(); final EnDecoder repPreV3; final EnDecoder repV3; @@ -591,7 +592,11 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) LOG.trace("Encode response {} to channel {}.", msg, ctx.channel()); } - if (msg instanceof ByteBuf) { + if (msg == MSG_FLUSH_PENDING_ADD_RESPONSES) { + flushPendingAddResponses(ctx, promise); + } else if (msg instanceof BookieProtocol.AddResponse) { + pendingAddResponses.add((BookieProtocol.AddResponse) msg); + } else if (msg instanceof ByteBuf) { ctx.write(msg, promise); } else if (msg instanceof BookkeeperProtocol.Response) { ctx.write(repV3.encode(msg, ctx.alloc()), promise); @@ -602,6 +607,20 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) ctx.write(msg, promise); } } + + private void flushPendingAddResponses(ChannelHandlerContext ctx, ChannelPromise promise) { + if (pendingAddResponses.isEmpty()) { + return; + } + int serializedSize = pendingAddResponses.size() * (ResponseEnDeCoderPreV3.RESPONSE_HEADERS_SIZE + 4); + ByteBuf buf = ctx.alloc().directBuffer(serializedSize); + for (AddResponse addResponse : pendingAddResponses) { + ResponseEnDeCoderPreV3.serializeAddResponseInto(addResponse, buf); + addResponse.recycle(); + } + pendingAddResponses.clear(); + ctx.writeAndFlush(buf, promise); + } } /** diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java index 3d906dba449..fb468d5601e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java @@ -20,7 +20,6 @@ */ package org.apache.bookkeeper.proto; -import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.group.ChannelGroup; @@ -28,23 +27,18 @@ import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.processor.RequestProcessor; +import org.apache.bookkeeper.proto.BookieProtoEncoding.ResponseEncoder; /** * Serverside handler for bookkeeper requests. */ @Slf4j public class BookieRequestHandler extends ChannelInboundHandlerAdapter { - - private static final int DEFAULT_PENDING_RESPONSE_SIZE = 256; - private final RequestProcessor requestProcessor; private final ChannelGroup allChannels; private ChannelHandlerContext ctx; - private ByteBuf pendingSendResponses = null; - private int maxPendingResponsesSize = DEFAULT_PENDING_RESPONSE_SIZE; - BookieRequestHandler(ServerConfiguration conf, RequestProcessor processor, ChannelGroup allChannels) { this.requestProcessor = processor; this.allChannels = allChannels; @@ -90,24 +84,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception requestProcessor.processRequest(msg, this); } - public synchronized void prepareSendResponseV2(int rc, BookieProtocol.ParsedAddRequest req) { - if (pendingSendResponses == null) { - pendingSendResponses = ctx().alloc().directBuffer(maxPendingResponsesSize); - } - BookieProtoEncoding.ResponseEnDeCoderPreV3.serializeAddResponseInto(rc, req, pendingSendResponses); + public void prepareSendResponseV2(int rc, BookieProtocol.ParsedAddRequest req) { + BookieProtocol.AddResponse response = BookieProtocol.AddResponse.create(req.getProtocolVersion(), rc, + req.getLedgerId(), req.getEntryId()); + ctx().pipeline().write(response, ctx().voidPromise()); } - public synchronized void flushPendingResponse() { - if (pendingSendResponses != null) { - maxPendingResponsesSize = (int) Math.max( - maxPendingResponsesSize * 0.5 + 0.5 * pendingSendResponses.readableBytes(), - DEFAULT_PENDING_RESPONSE_SIZE); - if (ctx().channel().isActive()) { - ctx().writeAndFlush(pendingSendResponses, ctx.voidPromise()); - } else { - pendingSendResponses.release(); - } - pendingSendResponses = null; - } + public void flushPendingResponse() { + ctx().pipeline().writeAndFlush(ResponseEncoder.MSG_FLUSH_PENDING_ADD_RESPONSES, ctx().voidPromise()); } }