From 176611a155fecad24ef44a72efc32af9b156a51b Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Sat, 25 Feb 2023 00:38:58 +0000 Subject: [PATCH] avoid consuming receive buffers when blocked by queue By disabling auto-read and triggering reads when a channel becomes active or when the previous read on a channel has completed, we avoid pulling bytes out of our TCP receive buffer prematurely, which allows the normal mechanisms of TCP back-pressure to work as expected. When auto-read enabled, an input experiencing back-pressure from the pipeline's queue would effectively avoid propagating that back-pressure by continuing to transfer bytes from the receive buffer into effectively unlimited direct memory buffers until it reached netty's memory allocation limit or an OOM was reached by allocating more memory than was available. It is likely that disabling auto-read will have some negative effect on throughput with the default-size receive buffers, and that we will also need to tune these or provide a way to make them user-tunable. --- src/main/java/org/logstash/tcp/InputLoop.java | 31 +++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/logstash/tcp/InputLoop.java b/src/main/java/org/logstash/tcp/InputLoop.java index 8229517..1731da3 100644 --- a/src/main/java/org/logstash/tcp/InputLoop.java +++ b/src/main/java/org/logstash/tcp/InputLoop.java @@ -132,12 +132,17 @@ private static final class InputHandler extends ChannelInitializerIt requires the channel to be configured without {@code AUTO_READ}

+ */ + private static final class ThrottleReleaseHandler extends ChannelInboundHandlerAdapter { + @Override + public void channelActive(final ChannelHandlerContext ctx) throws Exception { + super.channelActive(ctx); + ctx.channel().read(); + } + + @Override + public void channelReadComplete(final ChannelHandlerContext ctx) throws Exception { + super.channelReadComplete(ctx); + ctx.channel().read(); + } + } + /** * Listeners that flushes the the JRuby supplied {@link Decoder} when the socket is closed. */