Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.github.ambry.account.Account;
import com.github.ambry.account.AccountService;
import com.github.ambry.account.Container;
import com.github.ambry.clustermap.ClusterMap;
import com.github.ambry.clustermap.ClusterMapUtils;
import com.github.ambry.commons.ByteBufferAsyncWritableChannel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -574,10 +575,23 @@ void setOperationCompleted() {

/**
* Clean up the chunks to release any data buffer. This should be invoked when terminating the operation with
* an exception.
* an exception. This method also closes the chunkFillerChannel to fire any pending callbacks, ensuring the original
* buffer from the ReadableStreamChannel is properly released. Synchronized for memory visibility on channelReadBuf.
* The contract upheld by PutManager is that this method is called AT-MOST-ONCE.
*/
public void cleanupChunks() {
releaseDataForAllChunks();
public synchronized void cleanupChunks() {
try {
releaseDataForAllChunks();
} finally {
// Release the extra reference we retained when storing in channelReadBuf.
if (channelReadBuf != null) {
channelReadBuf.release();
channelReadBuf = null;
}
// Close the chunkFillerChannel to fire any remaining callbacks in chunksAwaitingResolution.
// This ensures the original buffer (owned by the callback) is released and not leaked.
chunkFillerChannel.close();
Comment on lines +591 to +593
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two cases we need to close the chunkFillerChannel to clean up the buffered chunks.

  1. Client close the connection before sending all the bytes
  2. PutOperation failed to write to ambry servers.

In both cases, an exception would be send back to the NettyResponseChannel to close NettyRequest. When NettyRequest is closed, it would call the callback method we passed to int readInto method, which would close the chunk filler channel.

ByteBufferAsyncWritableChannel has an atomic boolean variable to make sure the channel is closed only once, so calling close twice won't do any harm. But it's also not necessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the order matters here.

  • cleanupChunks() may fire before the readInto callback
  • channelReadBuf's retain is protecting memory in chunks in the chunkFillerChannel

Ergo, only where the chunkFillerChannel is closed should channelReadBuf be released.

If what you assert is true, that readInto callback is always called, we should just move all of cleanupChunks() logic into the readInto callback and delete cleanupChunks outright. That would fully delegate responsibility and remove this confusion over who's job is it to do what.

If the readInto callback isn't always called, then we need to close it here when channelReadBuf is released. Let me know which you'd prefer.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving cleanupChunks to readnto callback is not a bad idea. I would prefer it that way. This way, we can make it a private method and don't have to call it from PutManager. It's probably better in this way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did more work on this and existing test suite makes it very clear: the system expects that the readInto callback doesn't expect chunks to be cleaned up. I think the invariant we originally had here is correct:

  1. PutOperation must retain the slices memory it's using in chunks until those chunks are cleaned up.
  2. Once the chunks are cleaned up, the channel must be closed and slices of memory released.
  3. The channel could close concurrently with the chunks being operated on.

readInto doesn't have a guarantee that it runs after the PutOperation is done with the chunks, so it can't clean up the chunks. Ergo, I the current PR commit is the right implementation.

}
}

/**
Expand Down Expand Up @@ -676,13 +690,18 @@ boolean isChunkFillingDone() {
* chunkFillerChannel, if there is any.
* @throws InterruptedException if the call to get a chunk from the chunkFillerChannel is interrupted.
*/
void fillChunks() {
synchronized void fillChunks() {
try {
PutChunk chunkToFill;
while (!isChunkFillingDone()) {
// Attempt to fill a chunk
if (channelReadBuf == null) {
channelReadBuf = chunkFillerChannel.getNextByteBuf(0);
if (channelReadBuf != null) {
// Retain the buffer to protect against the channel callback releasing it
// while we still hold a reference we're processing.
channelReadBuf.retain();
}
}
if (channelReadBuf != null) {
if (channelReadBuf.readableBytes() > 0 && isChunkAwaitingResolution()) {
Expand All @@ -707,8 +726,13 @@ void fillChunks() {
routerCallback.onPollReady();
}
if (!channelReadBuf.isReadable()) {
chunkFillerChannel.resolveOldestChunk(null);
channelReadBuf = null;
try {
chunkFillerChannel.resolveOldestChunk(null);
} finally {
// Release the reference we retained when storing getNextByteBuf, even if resolveOldestChunk throws.
channelReadBuf.release();
channelReadBuf = null;
}
}
}
} else {
Expand Down Expand Up @@ -1096,16 +1120,19 @@ boolean isStitchOperation() {
}

/**
* Set the exception associated with this operation.
* First, if current operationException is null, directly set operationException as exception;
* Second, if operationException exists, compare ErrorCodes of exception and existing operation Exception depending
* on precedence level. An ErrorCode with a smaller precedence level overrides an ErrorCode with a larger precedence
* level. Update the operationException if necessary.
* @param exception the {@link RouterException} to possibly set.
* Set the exception associated with this operation and mark it complete.
* For {@link RouterException}: uses precedence-based replacement where lower precedence
* levels override higher ones.
* For {@link java.nio.channels.ClosedChannelException}: only set if no other exception has
* been set to avoid overwriting meaningful errors.
* For all others simply set the exception as we don't know what they are or how to classify them.
* @param exception the {@link Exception} to possibly set.
*/
void setOperationExceptionAndComplete(Exception exception) {
if (exception instanceof RouterException) {
RouterUtils.replaceOperationException(operationException, (RouterException) exception, this::getPrecedenceLevel);
} else if (exception instanceof ClosedChannelException) {
operationException.compareAndSet(null, exception);
} else {
operationException.set(exception);
}
Expand Down Expand Up @@ -1642,7 +1669,7 @@ void onFillComplete(boolean updateMetric) {
* @param channelReadBuf the {@link ByteBuf} from which to read data.
* @return the number of bytes transferred in this operation.
*/
synchronized int fillFrom(ByteBuf channelReadBuf) {
int fillFrom(ByteBuf channelReadBuf) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we still need this function to be synchronized here, it's here to protect race condition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets walk through to see if that's the case.

fillFrom is called in only one place, from fillChunks which itself is only called from ChunkFiller in PutManager. ChunkFiller is a runnable run by a single thread:

    chunkFillerThread = Utils.newThread("ChunkFillerThread-" + suffix, new ChunkFiller(), true);
    chunkFillerThread.start();

So fillChunks since it's only accessed by a single thread would only needs to be synchronized from concurrent access from error / cleanup threads. What is needed is that any objects used within the fillChunks routine which may also be concurrently accessed by those threads to be either behind a more narrowly scoped lock or declared as volatile. So lets look at that.

In PutManager.poll we have:

    for (PutOperation op : putOperations) {
      try {
        op.poll(requestRegistrationCallback);
      } catch (Exception e) {
        op.setOperationExceptionAndComplete(
            new RouterException("Put poll encountered unexpected error", e, RouterErrorCode.UnexpectedInternalError));
      }
      if (op.isOperationComplete() && putOperations.remove(op)) {
        // In order to ensure that an operation is completed only once, call onComplete() only at the place where the
        // operation actually gets removed from the set of operations. See comment within closePendingOperations().
        onComplete(op);
      }
    }

So

  1. setOperationExceptionAndComplete may be set with a RouterException.
  2. If isOperationComplete is true, then onComplete may be called which calls cleanupChunks

Therefore we need to

a) make sure anything that happens within setOperationExceptionAndComplete is not concurrent with anything that happens in fillChunks.
b) make sure that either i) nothing concurrent happens in fillChunks after isOperationComplete is true or ii) make sure what does happen is behind a lock.

In PutManager.completePendingOperations we have:

    for (PutOperation op : putOperations) {
      // There is a rare scenario where the operation gets removed from this set and gets completed concurrently by
      // the RequestResponseHandler thread when it is in poll() or handleResponse(). In order to avoid the completion
      // from happening twice, complete it here only if the remove was successful.
      if (putOperations.remove(op)) {
        op.cleanupChunks();
        Exception e = new RouterException("Aborted operation because Router is closed.", RouterErrorCode.RouterClosed);
        routerMetrics.operationDequeuingRate.mark();
        routerMetrics.operationAbortCount.inc();
        routerMetrics.onPutBlobError(e, op.isEncryptionEnabled(), op.isStitchOperation());
        nonBlockingRouter.completeOperation(op.getFuture(), op.getCallback(), null, e);
      }
    }

and completePendingOperations only runs as cleanup within the Chunkfiller thread, proving that it cannot be concurrent with fillChunks

So lets look at condition a:

  void setOperationExceptionAndComplete(Exception exception) {
    if (exception instanceof RouterException) {
      RouterUtils.replaceOperationException(operationException, (RouterException) exception, this::getPrecedenceLevel);
    } else if (exception instanceof ClosedChannelException) {
      operationException.compareAndSet(null, exception);
    } else {
      operationException.set(exception);
    }
    setOperationCompleted();
  }
  
  ...
  
    void setOperationCompleted() {
    operationCompleted = true;
    clearReadyChunks();
  }
  
  ...
  
    private synchronized void clearReadyChunks() {
    for (PutChunk chunk : putChunks) {
      logger.debug("{}: Chunk {} state: {}", loggingContext, chunk.getChunkIndex(), chunk.getState());
      // Only release the chunk in ready or complete mode. Filler thread will release the chunk in building mode
      // and the encryption thread will release the chunk in encrypting mode.
      if (chunk.isReady() || chunk.isComplete()) {
        chunk.clear();
      }
    }
  }

So for condition A we set the exception, set operation completed, and clear chunks which are provably finished. None of this will involve concurrent modification with fillChunks.

Lets looks at condition b:

fillChunks() {
  // a lot of channelReadBuf and chunk modification!
}

For condition b we can either add synchronized to fillChunks (instead of fillFrom) or we can add a lock around updating the operationCompleted value (and when we need to avoid TOCTOU). The synchronized on fillChunks should cause the least amount of complexity without too large an of an overhead as most of the work in fillChunks happens within an internal loop depending on the operationCompleted value.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you added synchronized to both cleanupChunks and fillChunk method, these should be enough to prevent race condition from happening.

int toWrite;
ByteBuf slice;
if (buf == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.github.ambry.router;

import com.codahale.metrics.MetricRegistry;
import com.github.ambry.utils.NettyByteBufLeakHelper;
import com.github.ambry.account.Account;
import com.github.ambry.account.Container;
import com.github.ambry.clustermap.DataNodeId;
Expand All @@ -22,6 +23,7 @@
import com.github.ambry.clustermap.PartitionId;
import com.github.ambry.clustermap.ReplicaId;
import com.github.ambry.commons.BlobId;
import com.github.ambry.commons.ByteBufReadableStreamChannel;
import com.github.ambry.commons.ByteBufferReadableStreamChannel;
import com.github.ambry.commons.Callback;
import com.github.ambry.commons.LoggingNotificationSystem;
Expand All @@ -37,7 +39,6 @@
import com.github.ambry.frontend.Operations;
import com.github.ambry.messageformat.BlobProperties;
import com.github.ambry.messageformat.MessageFormatRecord;
import com.github.ambry.named.NamedBlobRecord;
import com.github.ambry.network.NetworkClient;
import com.github.ambry.network.NetworkClientErrorCode;
import com.github.ambry.network.NetworkClientFactory;
Expand Down Expand Up @@ -97,6 +98,8 @@
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import javax.sql.DataSource;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import org.json.JSONObject;
import org.junit.AfterClass;
import org.junit.Assert;
Expand Down Expand Up @@ -4550,4 +4553,52 @@ static void verifyRepairRequestRecordInDb(MysqlRepairRequestsDb db, BlobId blobI
assertEquals(expectedRecord.getExpirationTimeMs(), record.getExpirationTimeMs());
}
}

/**
* Test for bytebuf memory leaks in PutOperation when operations are aborted in the middle of a put operation.
* This test verifies that PutOperation properly releases bytebuf when the operation completes/fails, even if
* the ChunkFiller thread hasn't processed some data yet.
*/
@Test
public void testPutOperationByteBufLeakOnAbort() throws Exception {
NettyByteBufLeakHelper testLeakHelper = new NettyByteBufLeakHelper();
testLeakHelper.beforeTest();

Properties props = getNonBlockingRouterProperties(localDcName);
int chunkSize = 512;
props.setProperty("router.max.put.chunk.size.bytes", Integer.toString(chunkSize));
setRouter(props, mockServerLayout, new LoggingNotificationSystem());

// Configure servers to succeed for first few chunks, then fail
List<ServerErrorCode> serverErrorList = new ArrayList<>();
serverErrorList.add(ServerErrorCode.NoError);
serverErrorList.add(ServerErrorCode.NoError);
for (int i = 0; i < 100; i++) {
serverErrorList.add(ServerErrorCode.PartitionReadOnly);
}
mockServerLayout.getMockServers().forEach(server -> server.setServerErrors(serverErrorList));

// The first two will run normally, but 3+ will get ServerErrorCode.PartitionReadOnly
int blobSize = 100 * chunkSize;
byte[] blobData = new byte[blobSize];
ThreadLocalRandom.current().nextBytes(blobData);
ByteBuf pooledBuf = PooledByteBufAllocator.DEFAULT.buffer(blobSize);
pooledBuf.writeBytes(blobData);
ByteBufReadableStreamChannel channel = new ByteBufReadableStreamChannel(pooledBuf);

BlobProperties blobProperties = new BlobProperties(blobSize, "serviceId", "ownerId", "contentType",
false, Utils.Infinite_Time, Utils.getRandomShort(ThreadLocalRandom.current()),
Utils.getRandomShort(ThreadLocalRandom.current()), false, null, null, null);

try {
router.putBlob(blobProperties, new byte[10], channel, PutBlobOptions.DEFAULT).get();
} catch (ExecutionException e) {
// Expected for operations that hit error responses
}
// If there are leaks, it will be detected in NettyByteBufLeakHelper and fail the test.
// Should be called before router close as closing of the router shouldn't be required to prevent leaks.
testLeakHelper.afterTest();
router.close();
router = null;
}
}
Loading
Loading