diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java index 3197165827a..55bd3e31225 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java @@ -101,6 +101,10 @@ public synchronized void close() throws IOException { if (closed) { return; } + + super.close(); + writeBufferStartPosition.set(0); + ReferenceCountUtil.release(writeBuffer); fileChannel.close(); closed = true; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java index 22f5a81690d..3ac5a22c197 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java @@ -24,6 +24,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import io.netty.util.ReferenceCountUtil; import java.io.IOException; import java.nio.channels.FileChannel; @@ -44,6 +45,8 @@ public class BufferedReadChannel extends BufferedChannelBase { long invocationCount = 0; long cacheHitCount = 0; + private boolean closed = false; + public BufferedReadChannel(FileChannel fileChannel, int readCapacity) { super(fileChannel); this.readCapacity = readCapacity; @@ -103,4 +106,17 @@ public synchronized void clear() { readBuffer.clear(); } + public synchronized void close() throws IOException { + if (closed) { + return; + } + + readBufferStartPosition = Long.MIN_VALUE; + ReferenceCountUtil.release(readBuffer); + + // BufferedReadChannel is not response for fileChannel close. + + closed = true; + } + } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java index 64c6508d67d..2ecb9fcb413 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java @@ -899,6 +899,7 @@ private BufferedReadChannel getChannelForLogId(long entryLogId) throws IOExcepti // We set the position of the write buffer of this buffered channel to Long.MAX_VALUE // so that there are no overlaps with the write buffer while reading fc = new BufferedReadChannel(newFc, conf.getReadBufferBytes()); + putInReadChannels(entryLogId, fc); return fc; }