Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
import org.apache.bookkeeper.proto.BookieProtocol.AddResponse;
import org.apache.bookkeeper.proto.BookieProtocol.PacketHeader;
import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType;
import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
Expand Down Expand Up @@ -265,7 +268,7 @@ public ResponseEnDeCoderPreV3(ExtensionRegistry extensionRegistry) {
this.extensionRegistry = extensionRegistry;
}

private static final int RESPONSE_HEADERS_SIZE = 24;
public static final int RESPONSE_HEADERS_SIZE = 24;

@Override
public Object encode(Object msg, ByteBufAllocator allocator)
Expand Down Expand Up @@ -334,11 +337,7 @@ public Object encode(Object msg, ByteBufAllocator allocator)
}
} else if (msg instanceof BookieProtocol.AddResponse) {
ByteBuf buf = allocator.buffer(RESPONSE_HEADERS_SIZE + 4 /* frame size */);
buf.writeInt(RESPONSE_HEADERS_SIZE);
buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(), r.getOpCode(), (short) 0));
buf.writeInt(r.getErrorCode());
buf.writeLong(r.getLedgerId());
buf.writeLong(r.getEntryId());
serializeAddResponseInto((AddResponse) r, buf);
return buf;
} else if (msg instanceof BookieProtocol.AuthResponse) {
BookkeeperProtocol.AuthMessage am = ((BookieProtocol.AuthResponse) r).getAuthMessage();
Expand All @@ -358,6 +357,7 @@ public Object encode(Object msg, ByteBufAllocator allocator)
r.recycle();
}
}

@Override
public Object decode(ByteBuf buffer)
throws Exception {
Expand Down Expand Up @@ -411,12 +411,12 @@ public Object decode(ByteBuf buffer)
}
}

public static void serializeAddResponseInto(int rc, BookieProtocol.ParsedAddRequest req, ByteBuf buf) {
buf.writeInt(RESPONSE_HEADERS_SIZE); // Frame size
buf.writeInt(PacketHeader.toInt(req.getProtocolVersion(), req.getOpCode(), (short) 0));
buf.writeInt(rc); // rc-code
buf.writeLong(req.getLedgerId());
buf.writeLong(req.getEntryId());
public static void serializeAddResponseInto(AddResponse addResponse, ByteBuf buf) {
buf.writeInt(RESPONSE_HEADERS_SIZE);
buf.writeInt(PacketHeader.toInt(addResponse.getProtocolVersion(), addResponse.getOpCode(), (short) 0));
buf.writeInt(addResponse.getErrorCode());
buf.writeLong(addResponse.getLedgerId());
buf.writeLong(addResponse.getEntryId());
}
}

Expand Down Expand Up @@ -575,8 +575,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
/**
* A response message encoder.
*/
@Sharable
public static class ResponseEncoder extends ChannelOutboundHandlerAdapter {
public static final Object MSG_FLUSH_PENDING_ADD_RESPONSES = new Object();
private final List<AddResponse> pendingAddResponses = new ArrayList<>();
final EnDecoder repPreV3;
final EnDecoder repV3;

Expand All @@ -591,7 +592,11 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
LOG.trace("Encode response {} to channel {}.", msg, ctx.channel());
}

if (msg instanceof ByteBuf) {
if (msg == MSG_FLUSH_PENDING_ADD_RESPONSES) {
flushPendingAddResponses(ctx, promise);
} else if (msg instanceof BookieProtocol.AddResponse) {
pendingAddResponses.add((BookieProtocol.AddResponse) msg);
} else if (msg instanceof ByteBuf) {
ctx.write(msg, promise);
} else if (msg instanceof BookkeeperProtocol.Response) {
ctx.write(repV3.encode(msg, ctx.alloc()), promise);
Expand All @@ -602,6 +607,20 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
ctx.write(msg, promise);
}
}

private void flushPendingAddResponses(ChannelHandlerContext ctx, ChannelPromise promise) {
if (pendingAddResponses.isEmpty()) {
return;
}
int serializedSize = pendingAddResponses.size() * (ResponseEnDeCoderPreV3.RESPONSE_HEADERS_SIZE + 4);
ByteBuf buf = ctx.alloc().directBuffer(serializedSize);
for (AddResponse addResponse : pendingAddResponses) {
ResponseEnDeCoderPreV3.serializeAddResponseInto(addResponse, buf);
addResponse.recycle();
}
pendingAddResponses.clear();
ctx.writeAndFlush(buf, promise);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,25 @@
*/
package org.apache.bookkeeper.proto;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.group.ChannelGroup;
import java.nio.channels.ClosedChannelException;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.processor.RequestProcessor;
import org.apache.bookkeeper.proto.BookieProtoEncoding.ResponseEncoder;

/**
* Serverside handler for bookkeeper requests.
*/
@Slf4j
public class BookieRequestHandler extends ChannelInboundHandlerAdapter {

private static final int DEFAULT_PENDING_RESPONSE_SIZE = 256;

private final RequestProcessor requestProcessor;
private final ChannelGroup allChannels;

private ChannelHandlerContext ctx;

private ByteBuf pendingSendResponses = null;
private int maxPendingResponsesSize = DEFAULT_PENDING_RESPONSE_SIZE;

BookieRequestHandler(ServerConfiguration conf, RequestProcessor processor, ChannelGroup allChannels) {
this.requestProcessor = processor;
this.allChannels = allChannels;
Expand Down Expand Up @@ -90,24 +84,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
requestProcessor.processRequest(msg, this);
}

public synchronized void prepareSendResponseV2(int rc, BookieProtocol.ParsedAddRequest req) {
if (pendingSendResponses == null) {
pendingSendResponses = ctx().alloc().directBuffer(maxPendingResponsesSize);
}
BookieProtoEncoding.ResponseEnDeCoderPreV3.serializeAddResponseInto(rc, req, pendingSendResponses);
public void prepareSendResponseV2(int rc, BookieProtocol.ParsedAddRequest req) {
BookieProtocol.AddResponse response = BookieProtocol.AddResponse.create(req.getProtocolVersion(), rc,
req.getLedgerId(), req.getEntryId());
ctx().pipeline().write(response, ctx().voidPromise());
}

public synchronized void flushPendingResponse() {
if (pendingSendResponses != null) {
maxPendingResponsesSize = (int) Math.max(
maxPendingResponsesSize * 0.5 + 0.5 * pendingSendResponses.readableBytes(),
DEFAULT_PENDING_RESPONSE_SIZE);
if (ctx().channel().isActive()) {
ctx().writeAndFlush(pendingSendResponses, ctx.voidPromise());
} else {
pendingSendResponses.release();
}
pendingSendResponses = null;
}
public void flushPendingResponse() {
ctx().pipeline().writeAndFlush(ResponseEncoder.MSG_FLUSH_PENDING_ADD_RESPONSES, ctx().voidPromise());
}
}
Loading