Skip to content

CSHARP-3984: Remove BinaryConnection.DropBox #1754

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 45 additions & 185 deletions src/MongoDB.Driver/Core/Connections/BinaryConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

using System;
using System.Buffers.Binary;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.IO;
using System.Net;
Expand Down Expand Up @@ -46,15 +45,12 @@ internal sealed class BinaryConnection : IConnection
private ConnectionInitializerContext _connectionInitializerContext;
private EndPoint _endPoint;
private ConnectionDescription _description;
private readonly Dropbox _dropbox = new Dropbox();
private bool _failedEventHasBeenRaised;
private DateTime _lastUsedAtUtc;
private DateTime _openedAtUtc;
private readonly object _openLock = new object();
private Task _openTask;
private readonly SemaphoreSlim _receiveLock;
private CompressorType? _sendCompressorType;
private readonly SemaphoreSlim _sendLock;
private readonly ConnectionSettings _settings;
private readonly InterlockedInt32 _state;
private Stream _stream;
Expand All @@ -79,8 +75,6 @@ public BinaryConnection(
Ensure.IsNotNull(eventSubscriber, nameof(eventSubscriber));

_connectionId = new ConnectionId(serverId, settings.ConnectionIdLocalValueProvider());
_receiveLock = new SemaphoreSlim(1);
_sendLock = new SemaphoreSlim(1);
_state = new InterlockedInt32(State.Initial);

_compressorSource = new CompressorSource(settings.Compressors);
Expand Down Expand Up @@ -173,9 +167,6 @@ private void Dispose(bool disposing)
_eventLogger.LogAndPublish(new ConnectionClosingEvent(_connectionId, EventContext.OperationId));

var stopwatch = Stopwatch.StartNew();
_receiveLock.Dispose();
_sendLock.Dispose();

if (_stream != null)
{
try
Expand Down Expand Up @@ -356,50 +347,6 @@ private IByteBuffer ReceiveBuffer(OperationContext operationContext)
}
}

private IByteBuffer ReceiveBuffer(OperationContext operationContext, int responseTo)
{
using (var receiveLockRequest = new SemaphoreSlimRequest(_receiveLock, operationContext.RemainingTimeout, operationContext.CancellationToken))
{
var messageTask = _dropbox.GetMessageAsync(responseTo);
try
{
Task.WaitAny(messageTask, receiveLockRequest.Task);
if (messageTask.IsCompleted)
{
return _dropbox.RemoveMessage(responseTo); // also propagates exception if any
}

receiveLockRequest.Task.GetAwaiter().GetResult(); // propagate exceptions
while (true)
{
try
{
var buffer = ReceiveBuffer(operationContext);
_dropbox.AddMessage(buffer);
}
catch (Exception ex)
{
_dropbox.AddException(ex);
}

if (messageTask.IsCompleted)
{
return _dropbox.RemoveMessage(responseTo); // also propagates exception if any
}

operationContext.ThrowIfTimedOutOrCanceled();
}
}
catch
{
var ignored = messageTask.ContinueWith(
t => { _dropbox.RemoveMessage(responseTo).Dispose(); },
TaskContinuationOptions.OnlyOnRanToCompletion);
throw;
}
}
}

private async Task<IByteBuffer> ReceiveBufferAsync(OperationContext operationContext)
{
try
Expand All @@ -425,50 +372,6 @@ private async Task<IByteBuffer> ReceiveBufferAsync(OperationContext operationCon
}
}

private async Task<IByteBuffer> ReceiveBufferAsync(OperationContext operationContext, int responseTo)
{
using (var receiveLockRequest = new SemaphoreSlimRequest(_receiveLock, operationContext.RemainingTimeout, operationContext.CancellationToken))
{
var messageTask = _dropbox.GetMessageAsync(responseTo);
try
{
await Task.WhenAny(messageTask, receiveLockRequest.Task).ConfigureAwait(false);
if (messageTask.IsCompleted)
{
return _dropbox.RemoveMessage(responseTo); // also propagates exception if any
}

await receiveLockRequest.Task.ConfigureAwait(false); // propagate exceptions
while (true)
{
try
{
var buffer = await ReceiveBufferAsync(operationContext).ConfigureAwait(false);
_dropbox.AddMessage(buffer);
}
catch (Exception ex)
{
_dropbox.AddException(ex);
}

if (messageTask.IsCompleted)
{
return _dropbox.RemoveMessage(responseTo); // also propagates exception if any
}

operationContext.ThrowIfTimedOutOrCanceled();
}
}
catch
{
var ignored = messageTask.ContinueWith(
t => { _dropbox.RemoveMessage(responseTo).Dispose(); },
TaskContinuationOptions.OnlyOnRanToCompletion);
throw;
}
}
}

public ResponseMessage ReceiveMessage(
OperationContext operationContext,
int responseTo,
Expand All @@ -482,11 +385,19 @@ public ResponseMessage ReceiveMessage(
try
{
helper.ReceivingMessage();
using (var buffer = ReceiveBuffer(operationContext, responseTo))
while (true)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this because driver could stop waiting on response from the server at any moment, and if it happened we should ignore server's response that is not the current request. We used to have exactly the same behavior with DropBox, with only difference that DropBox would store such response forever.

Copy link
Member Author

@sanych-sun sanych-sun Aug 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And just to have full picture: we will change this in https://jira.mongodb.org/browse/DRIVERS-2884

Accordingly to the DRIVERS-2884 Driver should perform reading of pending response BEFORE sending request.

{
var message = helper.DecodeMessage(operationContext, buffer, encoderSelector);
helper.ReceivedMessage(buffer, message);
return message;
using (var buffer = ReceiveBuffer(operationContext))
{
if (responseTo != GetResponseTo(buffer))
{
continue;
}

var message = helper.DecodeMessage(operationContext, buffer, encoderSelector);
helper.ReceivedMessage(buffer, message);
return message;
}
}
}
catch (Exception ex)
Expand All @@ -497,7 +408,9 @@ public ResponseMessage ReceiveMessage(
}
}

public async Task<ResponseMessage> ReceiveMessageAsync(OperationContext operationContext, int responseTo,
public async Task<ResponseMessage> ReceiveMessageAsync(
OperationContext operationContext,
int responseTo,
IMessageEncoderSelector encoderSelector,
MessageEncoderSettings messageEncoderSettings)
{
Expand All @@ -508,11 +421,19 @@ public async Task<ResponseMessage> ReceiveMessageAsync(OperationContext operatio
try
{
helper.ReceivingMessage();
using (var buffer = await ReceiveBufferAsync(operationContext, responseTo).ConfigureAwait(false))
while (true)
{
var message = helper.DecodeMessage(operationContext, buffer, encoderSelector);
helper.ReceivedMessage(buffer, message);
return message;
using (var buffer = await ReceiveBufferAsync(operationContext).ConfigureAwait(false))
{
if (responseTo != GetResponseTo(buffer))
{
continue;
}

var message = helper.DecodeMessage(operationContext, buffer, encoderSelector);
helper.ReceivedMessage(buffer, message);
return message;
}
}
}
catch (Exception ex)
Expand All @@ -523,59 +444,39 @@ public async Task<ResponseMessage> ReceiveMessageAsync(OperationContext operatio
}
}

private int GetResponseTo(IByteBuffer message)
{
var backingBytes = message.AccessBackingBytes(8);
return BitConverter.ToInt32(backingBytes.Array, backingBytes.Offset);
}

private void SendBuffer(OperationContext operationContext, IByteBuffer buffer)
{
_sendLock.Wait(operationContext.RemainingTimeout, operationContext.CancellationToken);
try
{
if (_state.Value == State.Failed)
{
throw new MongoConnectionClosedException(_connectionId);
}

try
{
_stream.WriteBytes(operationContext, buffer, 0, buffer.Length);
_lastUsedAtUtc = DateTime.UtcNow;
}
catch (Exception ex)
{
var wrappedException = WrapExceptionIfRequired(ex, "sending a message to the server");
ConnectionFailed(wrappedException ?? ex);
if (wrappedException == null) { throw; } else { throw wrappedException; }
}
_stream.WriteBytes(operationContext, buffer, 0, buffer.Length);
_lastUsedAtUtc = DateTime.UtcNow;
}
finally
catch (Exception ex)
{
_sendLock.Release();
var wrappedException = WrapExceptionIfRequired(ex, "sending a message to the server");
ConnectionFailed(wrappedException ?? ex);
if (wrappedException == null) { throw; } else { throw wrappedException; }
}
}

private async Task SendBufferAsync(OperationContext operationContext, IByteBuffer buffer)
{
await _sendLock.WaitAsync(operationContext.RemainingTimeout, operationContext.CancellationToken).ConfigureAwait(false);
try
{
if (_state.Value == State.Failed)
{
throw new MongoConnectionClosedException(_connectionId);
}

try
{
await _stream.WriteBytesAsync(operationContext, buffer, 0, buffer.Length).ConfigureAwait(false);
_lastUsedAtUtc = DateTime.UtcNow;
}
catch (Exception ex)
{
var wrappedException = WrapExceptionIfRequired(ex, "sending a message to the server");
ConnectionFailed(wrappedException ?? ex);
if (wrappedException == null) { throw; } else { throw wrappedException; }
}
await _stream.WriteBytesAsync(operationContext, buffer, 0, buffer.Length).ConfigureAwait(false);
_lastUsedAtUtc = DateTime.UtcNow;
}
finally
catch (Exception ex)
{
_sendLock.Release();
var wrappedException = WrapExceptionIfRequired(ex, "sending a message to the server");
ConnectionFailed(wrappedException ?? ex);
if (wrappedException == null) { throw; } else { throw wrappedException; }
}
}

Expand Down Expand Up @@ -770,47 +671,6 @@ private void ThrowOperationCanceledExceptionIfRequired(Exception exception)
}

// nested classes
private class Dropbox
{
private readonly ConcurrentDictionary<int, TaskCompletionSource<IByteBuffer>> _messages = new ConcurrentDictionary<int, TaskCompletionSource<IByteBuffer>>();

// public methods
public void AddException(Exception exception)
{
foreach (var taskCompletionSource in _messages.Values)
{
taskCompletionSource.TrySetException(exception); // has no effect on already completed tasks
}
}

public void AddMessage(IByteBuffer message)
{
var responseTo = GetResponseTo(message);
var tcs = _messages.GetOrAdd(responseTo, x => new TaskCompletionSource<IByteBuffer>());
tcs.TrySetResult(message);
}

public Task<IByteBuffer> GetMessageAsync(int responseTo)
{
var tcs = _messages.GetOrAdd(responseTo, _ => new TaskCompletionSource<IByteBuffer>());
return tcs.Task;
}

public IByteBuffer RemoveMessage(int responseTo)
{
TaskCompletionSource<IByteBuffer> tcs;
_messages.TryRemove(responseTo, out tcs);
return tcs.Task.GetAwaiter().GetResult(); // RemoveMessage is only called when Task is complete
}

// private methods
private int GetResponseTo(IByteBuffer message)
{
var backingBytes = message.AccessBackingBytes(8);
return BinaryPrimitives.ReadInt32LittleEndian(new ReadOnlySpan<byte>(backingBytes.Array, backingBytes.Offset, 4));
}
}

private class OpenConnectionHelper
{
private readonly BinaryConnection _connection;
Expand Down
1 change: 1 addition & 0 deletions src/MongoDB.Driver/Core/Misc/SemaphoreSlimRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ namespace MongoDB.Driver.Core.Misc
/// <summary>
/// Represents a tentative request to acquire a SemaphoreSlim.
/// </summary>
[Obsolete("SemaphoreSlimRequest is deprecated and will be removed in future release")]
public sealed class SemaphoreSlimRequest : IDisposable
{
// private fields
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,52 +489,6 @@ public async Task ReceiveMessage_should_complete_when_reply_is_not_already_on_th
}
}

[Theory]
[ParameterAttributeData]
public async Task ReceiveMessage_should_handle_out_of_order_replies(
[Values(false, true)]
bool async1,
[Values(false, true)]
bool async2)
{
using (var stream = new BlockingMemoryStream())
{
_mockStreamFactory.Setup(f => f.CreateStreamAsync(_endPoint, It.IsAny<CancellationToken>())).ReturnsAsync(stream);
await _subject.OpenAsync(OperationContext.NoTimeout);
_capturedEvents.Clear();

var encoderSelector = new ReplyMessageEncoderSelector<BsonDocument>(BsonDocumentSerializer.Instance);

var receivedTask10 = async1 ?
_subject.ReceiveMessageAsync(OperationContext.NoTimeout, 10, encoderSelector, _messageEncoderSettings) :
Task.Run(() => _subject.ReceiveMessage(OperationContext.NoTimeout, 10, encoderSelector, _messageEncoderSettings));

var receivedTask11 = async2 ?
_subject.ReceiveMessageAsync(OperationContext.NoTimeout, 11, encoderSelector, _messageEncoderSettings) :
Task.Run(() => _subject.ReceiveMessage(OperationContext.NoTimeout, 11, encoderSelector, _messageEncoderSettings));

SpinWait.SpinUntil(() => _capturedEvents.Count >= 2, TimeSpan.FromSeconds(5)).Should().BeTrue();

var messageToReceive10 = MessageHelper.BuildReply<BsonDocument>(new BsonDocument("_id", 10), BsonDocumentSerializer.Instance, responseTo: 10);
var messageToReceive11 = MessageHelper.BuildReply<BsonDocument>(new BsonDocument("_id", 11), BsonDocumentSerializer.Instance, responseTo: 11);
MessageHelper.WriteResponsesToStream(stream, messageToReceive11, messageToReceive10); // out of order

var received10 = await receivedTask10;
var received11 = await receivedTask11;

var expected = MessageHelper.TranslateMessagesToBsonDocuments(new[] { messageToReceive10, messageToReceive11 });
var actual = MessageHelper.TranslateMessagesToBsonDocuments(new[] { received10, received11 });

actual.Should().BeEquivalentTo(expected);

_capturedEvents.Next().Should().BeOfType<ConnectionReceivingMessageEvent>();
_capturedEvents.Next().Should().BeOfType<ConnectionReceivingMessageEvent>();
_capturedEvents.Next().Should().BeOfType<ConnectionReceivedMessageEvent>();
_capturedEvents.Next().Should().BeOfType<ConnectionReceivedMessageEvent>();
_capturedEvents.Any().Should().BeFalse();
}
}

[Fact]
public async Task ReceiveMessage_should_not_produce_unobserved_task_exceptions_on_fail()
{
Expand Down
Loading