Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,48 +149,43 @@ public final class CommandMessage extends RequestMessage {
* `PAYLOAD_TYPE_1_DOCUMENT_SEQUENCE` sections.
*/
BsonDocument getCommandDocument(final ByteBufferBsonOutput bsonOutput) {
List<ByteBuf> byteBuffers = bsonOutput.getByteBuffers();
CompositeByteBuf byteBuf = new CompositeByteBuf(bsonOutput.getByteBuffers());
try {
CompositeByteBuf byteBuf = new CompositeByteBuf(byteBuffers);
try {
byteBuf.position(firstDocumentPosition);
ByteBufBsonDocument byteBufBsonDocument = createOne(byteBuf);

// If true, it means there is at least one `PAYLOAD_TYPE_1_DOCUMENT_SEQUENCE` section in the OP_MSG
if (byteBuf.hasRemaining()) {
BsonDocument commandBsonDocument = byteBufBsonDocument.toBaseBsonDocument();

// Each loop iteration processes one Document Sequence
// When there are no more bytes remaining, there are no more Document Sequences
while (byteBuf.hasRemaining()) {
// skip reading the payload type, we know it is `PAYLOAD_TYPE_1`
byteBuf.position(byteBuf.position() + 1);
int sequenceStart = byteBuf.position();
int sequenceSizeInBytes = byteBuf.getInt();
int sectionEnd = sequenceStart + sequenceSizeInBytes;

String fieldName = getSequenceIdentifier(byteBuf);
// If this assertion fires, it means that the driver has started using document sequences for nested fields. If
// so, this method will need to change in order to append the value to the correct nested document.
assertFalse(fieldName.contains("."));

ByteBuf documentsByteBufSlice = byteBuf.duplicate().limit(sectionEnd);
try {
commandBsonDocument.append(fieldName, new BsonArray(createList(documentsByteBufSlice)));
} finally {
documentsByteBufSlice.release();
}
byteBuf.position(sectionEnd);
byteBuf.position(firstDocumentPosition);
ByteBufBsonDocument byteBufBsonDocument = createOne(byteBuf);

// If true, it means there is at least one `PAYLOAD_TYPE_1_DOCUMENT_SEQUENCE` section in the OP_MSG
if (byteBuf.hasRemaining()) {
BsonDocument commandBsonDocument = byteBufBsonDocument.toBaseBsonDocument();

// Each loop iteration processes one Document Sequence
// When there are no more bytes remaining, there are no more Document Sequences
while (byteBuf.hasRemaining()) {
// skip reading the payload type, we know it is `PAYLOAD_TYPE_1`
byteBuf.position(byteBuf.position() + 1);
int sequenceStart = byteBuf.position();
int sequenceSizeInBytes = byteBuf.getInt();
int sectionEnd = sequenceStart + sequenceSizeInBytes;

String fieldName = getSequenceIdentifier(byteBuf);
// If this assertion fires, it means that the driver has started using document sequences for nested fields. If
// so, this method will need to change in order to append the value to the correct nested document.
assertFalse(fieldName.contains("."));

ByteBuf documentsByteBufSlice = byteBuf.duplicate().limit(sectionEnd);
try {
commandBsonDocument.append(fieldName, new BsonArray(createList(documentsByteBufSlice)));
} finally {
documentsByteBufSlice.release();
}
return commandBsonDocument;
} else {
return byteBufBsonDocument;
byteBuf.position(sectionEnd);
}
} finally {
byteBuf.release();
return commandBsonDocument;
} else {
return byteBufBsonDocument;
}
} finally {
byteBuffers.forEach(ByteBuf::release);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To understand whether removing byteBuffers.forEach(ByteBuf::release) here is correct, it is necessary to understand the reference counting–related semantics of the CompositeByteBuf(List<ByteBuf> buffers) constructor called above. That, in turn, necessitates understanding the reference counting–related semantics of the org.bson.ByteBuf.asReadOnly method, which is "helpfully" not specified.

A reader may omit everything from here and to "TL;DR".

So now it is necessary to figure out / establish the aforementioned semantics of org.bson.ByteBuf.asReadOnly, which requires us to look at all of the implementations (I'll refer to a buffer returned from org.bson.ByteBuf.asReadOnly as "derived", and a buffer on which asReadOnly is called as "parent", as this is how those are called in Netty):

  • ByteBufNIO.asReadOnly
    • The derived buffer has its own reference count.
    • The underlying buffer is derived from the parent's underlying buffer, has its own reference count (the strong Java reference).
    • The underlying buffer's reference count is decremented (the strong Java reference is removed) when the overlying buffer's reference count becomes 0; it is not changed in other ways.
      • The above matches the semantics of ByteBufNIO.duplicate, but nothing else.
  • NettyByteBuf.asReadOnly
    • [current]
      • The derived buffer shares the reference count with the parent (because it is the same buffer); asReadOnly does not change the count.
      • The underlying buffer shares reference count with the parent's underlying buffer (because it is the same buffer); asReadOnly does not change the count.
      • The underlying buffer shares the reference count with the overlying buffer.
    • [proposed]
      • [same] The derived buffer shares the reference count with the parent (because it is the same buffer); asReadOnly does not change the count.
      • [same] The underlying buffer shares reference count with the parent's underlying buffer (because it is the same buffer); asReadOnly does not change the count.
      • [different] The underlying buffer's reference count is incremented/decremented together with the overlying buffer's reference count.
  • CompositeByteBuf.asReadOnly
    • Throws UnsupportedOperationException.

Thus, deriving the semantics of org.bson.ByteBuf.asReadOnly from its implementations is impossible, as they are all different. Assuming that the reference counting–related semantics of org.bson.ByteBuf.asReadOnly and duplicate must be identical to each other(1), let's look at all the implementations of org.bson.ByteBuf.duplicate:

  • ByteBufNIO.duplicate
    • Matches the semantics of ByteBufNIO.asReadOnly, but nothing else.
  • NettyByteBuf.duplicate
    • [current]
      • The derived buffer shares the reference count with the parent; duplicate increments the count as per the two below items.
      • The underlying buffer is derived from the parent's underlying buffer, shares the reference count with its own parent; duplicate increments the count.
      • The underlying buffer shares the reference count with the overlying buffer.
    • [proposed in the PR]
      • [different] The derived buffer has its own reference count.
      • [same] The underlying buffer is derived from the parent's underlying buffer, shares the reference count with its own parent; duplicate increments the count.
      • [different] The underlying buffer's reference count is incremented/decremented together with the overlying buffer's reference count.
  • CompositeByteBuf.duplicate
    • [current]
      • The derived buffer has its own reference count.
      • The underlying buffer (there are multiple) shares reference count with the parent's underlying buffer (because it is the same buffer); duplicate does not change the count.
      • The underlying buffer's reference count is not changed in any ways.
    • [proposed in the PR]
      • [same] The derived buffer has its own reference count.
      • [different] The underlying buffer (there are multiple) is derived from the parent's underlying buffer with semantics of org.bson.ByteBuf.duplicate, which, as we have established, depends on the implementation.
      • [different] The underlying buffer's reference count is incremented/decremented together with the overlying buffer's reference count.

TL;DR

Given the above, I find it virtually impossible to reason about the correctness of both the code before the PR, and the proposed changes. So I added sample tests (I am not suggesting that their current locations is the right one, nor that they should be added to the codebase):

  • CompositeByteBufTest.complexTest imitates what's going on in CommandMessage.getCommandDocument;
  • CompositeByteBufTest.release/asReadOnly/duplicate/duplicateComposite expose more issues.

Ideally, we should:

  1. come up with the reference counting–related semantics of relevant methods of org.bson.ByteBuf;
  2. make all implementations adhere to that semantics;
  3. update the driver code such that it uses org.bson.ByteBuf and all its implementations in accordance to the established semantics.

I am not insisting on us doing that now, but nor do I see how to approve the current PR: one one hand, it definitely solves something, on the other hand, I have no idea if it breaks something else, and how bad the effect may be. Some sample tests definitely behave differently when run against main, though still fail; this suggests to me that with this PR we may trade some existing bugs to some other bugs.


(1) Netty calls such buffers "derived", and they have the following semantics: "a parent buffer and its derived buffers share the same reference count, and the reference count does not increase when a derived buffer is created".

byteBuf.release();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.atomic.AtomicInteger;

import static java.lang.String.format;
import static com.mongodb.assertions.Assertions.assertTrue;
import static org.bson.assertions.Assertions.isTrueArgument;
import static org.bson.assertions.Assertions.notNull;

Expand All @@ -49,8 +50,12 @@ class CompositeByteBuf implements ByteBuf {
limit = components.get(components.size() - 1).endOffset;
}

CompositeByteBuf(final CompositeByteBuf from) {
components = from.components;
private CompositeByteBuf(final CompositeByteBuf from) {
notNull("from", from);
components = new ArrayList<>(from.components.size());
from.components.forEach(component ->
components.add(new Component(component.buffer.duplicate(), component.offset))
);
position = from.position();
limit = from.limit();
}
Expand Down Expand Up @@ -306,15 +311,22 @@ public ByteBuf retain() {
referenceCount.decrementAndGet();
throw new IllegalStateException("Attempted to increment the reference count when it is already 0");
}
components.forEach(c -> c.buffer.retain());
return this;
}

@Override
public void release() {
if (referenceCount.decrementAndGet() < 0) {
int decrementedReferenceCount = referenceCount.decrementAndGet();
if (decrementedReferenceCount < 0) {
referenceCount.incrementAndGet();
throw new IllegalStateException("Attempted to decrement the reference count below 0");
}
components.forEach(c -> c.buffer.release());
if (decrementedReferenceCount == 0) {
assertTrue(components.stream().noneMatch(c -> c.buffer.getReferenceCount() > 0),
"All component buffers should have reference count 0 when CompositeByteBuf is fully released, but some still have references.");
Copy link
Member

@stIncMale stIncMale Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assertion combined with the fact that release/retain update referenceCount together with the reference count of each component.buffer, suggests to me that immediately after (in program order) constructing a CompositeByteBuf instance, the reference count of each component.buffer must be equal to referenceCount, which is 1. If this reasoning is correct, we should introduce such an assertion to the constructors of CompositeByteBuf. See #1825 (comment) for the proposed code change.

}
}

private Component findComponent(final int index) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.concurrent.atomic.AtomicInteger;

/**
* <p>This class is not part of the public API and may be removed or changed at any time</p>
*/
public final class NettyByteBuf implements ByteBuf {

private final AtomicInteger referenceCount = new AtomicInteger(1);
private io.netty.buffer.ByteBuf proxied;
private boolean isWriting = true;

Expand Down Expand Up @@ -271,17 +272,25 @@ public ByteBuffer asNIO() {

@Override
public int getReferenceCount() {
return proxied.refCnt();
return referenceCount.get();
}

@Override
public ByteBuf retain() {
if (referenceCount.incrementAndGet() == 1) {
referenceCount.decrementAndGet();
throw new IllegalStateException("Attempted to increment the reference count when it is already 0");
}
proxied.retain();
return this;
}

@Override
public void release() {
if (referenceCount.decrementAndGet() < 0) {
referenceCount.incrementAndGet();
throw new IllegalStateException("Attempted to decrement the reference count below 0");
}
proxied.release();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import com.mongodb.connection.SocketSettings
import com.mongodb.internal.connection.netty.NettyStreamFactory
import org.bson.BsonDocument
import org.bson.BsonInt32
import spock.lang.Ignore
import spock.lang.Specification

import java.util.concurrent.CountDownLatch
Expand All @@ -44,6 +43,7 @@ class CommandHelperSpecification extends Specification {
InternalConnection connection

def setup() {
InternalStreamConnection.setRecordEverything(true) // Ensures implementation can log as expected
connection = new InternalStreamConnectionFactory(ClusterConnectionMode.SINGLE,
new NettyStreamFactory(SocketSettings.builder().build(), getSslSettings()),
getCredentialWithCache(), CLIENT_METADATA, [], LoggerSettings.builder().build(), null, getServerApi())
Expand All @@ -55,7 +55,7 @@ class CommandHelperSpecification extends Specification {
connection?.close()
}

@Ignore("JAVA-5982")

def 'should execute command asynchronously'() {
when:
BsonDocument receivedDocument = null
Expand Down
Loading