From 5ebeb3633c16baaf43d28f822832d08597e3f159 Mon Sep 17 00:00:00 2001 From: Oleksandr Poliakov Date: Wed, 30 Jul 2025 11:34:38 -0700 Subject: [PATCH] CSHARP-3984: Remove BinaryConnection.DropBox --- .../Core/Connections/BinaryConnection.cs | 230 ++++-------------- .../Core/Misc/SemaphoreSlimRequest.cs | 1 + .../Core/Connections/BinaryConnectionTests.cs | 46 ---- .../Core/Misc/SemaphoreSlimRequestTests.cs | 125 ---------- 4 files changed, 46 insertions(+), 356 deletions(-) delete mode 100644 tests/MongoDB.Driver.Tests/Core/Misc/SemaphoreSlimRequestTests.cs diff --git a/src/MongoDB.Driver/Core/Connections/BinaryConnection.cs b/src/MongoDB.Driver/Core/Connections/BinaryConnection.cs index 1bb259b94eb..76d2c80c707 100644 --- a/src/MongoDB.Driver/Core/Connections/BinaryConnection.cs +++ b/src/MongoDB.Driver/Core/Connections/BinaryConnection.cs @@ -15,7 +15,6 @@ using System; using System.Buffers.Binary; -using System.Collections.Concurrent; using System.Diagnostics; using System.IO; using System.Net; @@ -46,15 +45,12 @@ internal sealed class BinaryConnection : IConnection private ConnectionInitializerContext _connectionInitializerContext; private EndPoint _endPoint; private ConnectionDescription _description; - private readonly Dropbox _dropbox = new Dropbox(); private bool _failedEventHasBeenRaised; private DateTime _lastUsedAtUtc; private DateTime _openedAtUtc; private readonly object _openLock = new object(); private Task _openTask; - private readonly SemaphoreSlim _receiveLock; private CompressorType? _sendCompressorType; - private readonly SemaphoreSlim _sendLock; private readonly ConnectionSettings _settings; private readonly InterlockedInt32 _state; private Stream _stream; @@ -79,8 +75,6 @@ public BinaryConnection( Ensure.IsNotNull(eventSubscriber, nameof(eventSubscriber)); _connectionId = new ConnectionId(serverId, settings.ConnectionIdLocalValueProvider()); - _receiveLock = new SemaphoreSlim(1); - _sendLock = new SemaphoreSlim(1); _state = new InterlockedInt32(State.Initial); _compressorSource = new CompressorSource(settings.Compressors); @@ -173,9 +167,6 @@ private void Dispose(bool disposing) _eventLogger.LogAndPublish(new ConnectionClosingEvent(_connectionId, EventContext.OperationId)); var stopwatch = Stopwatch.StartNew(); - _receiveLock.Dispose(); - _sendLock.Dispose(); - if (_stream != null) { try @@ -356,50 +347,6 @@ private IByteBuffer ReceiveBuffer(OperationContext operationContext) } } - private IByteBuffer ReceiveBuffer(OperationContext operationContext, int responseTo) - { - using (var receiveLockRequest = new SemaphoreSlimRequest(_receiveLock, operationContext.RemainingTimeout, operationContext.CancellationToken)) - { - var messageTask = _dropbox.GetMessageAsync(responseTo); - try - { - Task.WaitAny(messageTask, receiveLockRequest.Task); - if (messageTask.IsCompleted) - { - return _dropbox.RemoveMessage(responseTo); // also propagates exception if any - } - - receiveLockRequest.Task.GetAwaiter().GetResult(); // propagate exceptions - while (true) - { - try - { - var buffer = ReceiveBuffer(operationContext); - _dropbox.AddMessage(buffer); - } - catch (Exception ex) - { - _dropbox.AddException(ex); - } - - if (messageTask.IsCompleted) - { - return _dropbox.RemoveMessage(responseTo); // also propagates exception if any - } - - operationContext.ThrowIfTimedOutOrCanceled(); - } - } - catch - { - var ignored = messageTask.ContinueWith( - t => { _dropbox.RemoveMessage(responseTo).Dispose(); }, - TaskContinuationOptions.OnlyOnRanToCompletion); - throw; - } - } - } - private async Task ReceiveBufferAsync(OperationContext operationContext) { try @@ -425,50 +372,6 @@ private async Task ReceiveBufferAsync(OperationContext operationCon } } - private async Task ReceiveBufferAsync(OperationContext operationContext, int responseTo) - { - using (var receiveLockRequest = new SemaphoreSlimRequest(_receiveLock, operationContext.RemainingTimeout, operationContext.CancellationToken)) - { - var messageTask = _dropbox.GetMessageAsync(responseTo); - try - { - await Task.WhenAny(messageTask, receiveLockRequest.Task).ConfigureAwait(false); - if (messageTask.IsCompleted) - { - return _dropbox.RemoveMessage(responseTo); // also propagates exception if any - } - - await receiveLockRequest.Task.ConfigureAwait(false); // propagate exceptions - while (true) - { - try - { - var buffer = await ReceiveBufferAsync(operationContext).ConfigureAwait(false); - _dropbox.AddMessage(buffer); - } - catch (Exception ex) - { - _dropbox.AddException(ex); - } - - if (messageTask.IsCompleted) - { - return _dropbox.RemoveMessage(responseTo); // also propagates exception if any - } - - operationContext.ThrowIfTimedOutOrCanceled(); - } - } - catch - { - var ignored = messageTask.ContinueWith( - t => { _dropbox.RemoveMessage(responseTo).Dispose(); }, - TaskContinuationOptions.OnlyOnRanToCompletion); - throw; - } - } - } - public ResponseMessage ReceiveMessage( OperationContext operationContext, int responseTo, @@ -482,11 +385,19 @@ public ResponseMessage ReceiveMessage( try { helper.ReceivingMessage(); - using (var buffer = ReceiveBuffer(operationContext, responseTo)) + while (true) { - var message = helper.DecodeMessage(operationContext, buffer, encoderSelector); - helper.ReceivedMessage(buffer, message); - return message; + using (var buffer = ReceiveBuffer(operationContext)) + { + if (responseTo != GetResponseTo(buffer)) + { + continue; + } + + var message = helper.DecodeMessage(operationContext, buffer, encoderSelector); + helper.ReceivedMessage(buffer, message); + return message; + } } } catch (Exception ex) @@ -497,7 +408,9 @@ public ResponseMessage ReceiveMessage( } } - public async Task ReceiveMessageAsync(OperationContext operationContext, int responseTo, + public async Task ReceiveMessageAsync( + OperationContext operationContext, + int responseTo, IMessageEncoderSelector encoderSelector, MessageEncoderSettings messageEncoderSettings) { @@ -508,11 +421,19 @@ public async Task ReceiveMessageAsync(OperationContext operatio try { helper.ReceivingMessage(); - using (var buffer = await ReceiveBufferAsync(operationContext, responseTo).ConfigureAwait(false)) + while (true) { - var message = helper.DecodeMessage(operationContext, buffer, encoderSelector); - helper.ReceivedMessage(buffer, message); - return message; + using (var buffer = await ReceiveBufferAsync(operationContext).ConfigureAwait(false)) + { + if (responseTo != GetResponseTo(buffer)) + { + continue; + } + + var message = helper.DecodeMessage(operationContext, buffer, encoderSelector); + helper.ReceivedMessage(buffer, message); + return message; + } } } catch (Exception ex) @@ -523,59 +444,39 @@ public async Task ReceiveMessageAsync(OperationContext operatio } } + private int GetResponseTo(IByteBuffer message) + { + var backingBytes = message.AccessBackingBytes(8); + return BitConverter.ToInt32(backingBytes.Array, backingBytes.Offset); + } + private void SendBuffer(OperationContext operationContext, IByteBuffer buffer) { - _sendLock.Wait(operationContext.RemainingTimeout, operationContext.CancellationToken); try { - if (_state.Value == State.Failed) - { - throw new MongoConnectionClosedException(_connectionId); - } - - try - { - _stream.WriteBytes(operationContext, buffer, 0, buffer.Length); - _lastUsedAtUtc = DateTime.UtcNow; - } - catch (Exception ex) - { - var wrappedException = WrapExceptionIfRequired(ex, "sending a message to the server"); - ConnectionFailed(wrappedException ?? ex); - if (wrappedException == null) { throw; } else { throw wrappedException; } - } + _stream.WriteBytes(operationContext, buffer, 0, buffer.Length); + _lastUsedAtUtc = DateTime.UtcNow; } - finally + catch (Exception ex) { - _sendLock.Release(); + var wrappedException = WrapExceptionIfRequired(ex, "sending a message to the server"); + ConnectionFailed(wrappedException ?? ex); + if (wrappedException == null) { throw; } else { throw wrappedException; } } } private async Task SendBufferAsync(OperationContext operationContext, IByteBuffer buffer) { - await _sendLock.WaitAsync(operationContext.RemainingTimeout, operationContext.CancellationToken).ConfigureAwait(false); try { - if (_state.Value == State.Failed) - { - throw new MongoConnectionClosedException(_connectionId); - } - - try - { - await _stream.WriteBytesAsync(operationContext, buffer, 0, buffer.Length).ConfigureAwait(false); - _lastUsedAtUtc = DateTime.UtcNow; - } - catch (Exception ex) - { - var wrappedException = WrapExceptionIfRequired(ex, "sending a message to the server"); - ConnectionFailed(wrappedException ?? ex); - if (wrappedException == null) { throw; } else { throw wrappedException; } - } + await _stream.WriteBytesAsync(operationContext, buffer, 0, buffer.Length).ConfigureAwait(false); + _lastUsedAtUtc = DateTime.UtcNow; } - finally + catch (Exception ex) { - _sendLock.Release(); + var wrappedException = WrapExceptionIfRequired(ex, "sending a message to the server"); + ConnectionFailed(wrappedException ?? ex); + if (wrappedException == null) { throw; } else { throw wrappedException; } } } @@ -770,47 +671,6 @@ private void ThrowOperationCanceledExceptionIfRequired(Exception exception) } // nested classes - private class Dropbox - { - private readonly ConcurrentDictionary> _messages = new ConcurrentDictionary>(); - - // public methods - public void AddException(Exception exception) - { - foreach (var taskCompletionSource in _messages.Values) - { - taskCompletionSource.TrySetException(exception); // has no effect on already completed tasks - } - } - - public void AddMessage(IByteBuffer message) - { - var responseTo = GetResponseTo(message); - var tcs = _messages.GetOrAdd(responseTo, x => new TaskCompletionSource()); - tcs.TrySetResult(message); - } - - public Task GetMessageAsync(int responseTo) - { - var tcs = _messages.GetOrAdd(responseTo, _ => new TaskCompletionSource()); - return tcs.Task; - } - - public IByteBuffer RemoveMessage(int responseTo) - { - TaskCompletionSource tcs; - _messages.TryRemove(responseTo, out tcs); - return tcs.Task.GetAwaiter().GetResult(); // RemoveMessage is only called when Task is complete - } - - // private methods - private int GetResponseTo(IByteBuffer message) - { - var backingBytes = message.AccessBackingBytes(8); - return BinaryPrimitives.ReadInt32LittleEndian(new ReadOnlySpan(backingBytes.Array, backingBytes.Offset, 4)); - } - } - private class OpenConnectionHelper { private readonly BinaryConnection _connection; diff --git a/src/MongoDB.Driver/Core/Misc/SemaphoreSlimRequest.cs b/src/MongoDB.Driver/Core/Misc/SemaphoreSlimRequest.cs index 6ef047c725a..8c583fa72d8 100644 --- a/src/MongoDB.Driver/Core/Misc/SemaphoreSlimRequest.cs +++ b/src/MongoDB.Driver/Core/Misc/SemaphoreSlimRequest.cs @@ -22,6 +22,7 @@ namespace MongoDB.Driver.Core.Misc /// /// Represents a tentative request to acquire a SemaphoreSlim. /// + [Obsolete("SemaphoreSlimRequest is deprecated and will be removed in future release")] public sealed class SemaphoreSlimRequest : IDisposable { // private fields diff --git a/tests/MongoDB.Driver.Tests/Core/Connections/BinaryConnectionTests.cs b/tests/MongoDB.Driver.Tests/Core/Connections/BinaryConnectionTests.cs index 82735c1b59f..0e4d0608148 100644 --- a/tests/MongoDB.Driver.Tests/Core/Connections/BinaryConnectionTests.cs +++ b/tests/MongoDB.Driver.Tests/Core/Connections/BinaryConnectionTests.cs @@ -489,52 +489,6 @@ public async Task ReceiveMessage_should_complete_when_reply_is_not_already_on_th } } - [Theory] - [ParameterAttributeData] - public async Task ReceiveMessage_should_handle_out_of_order_replies( - [Values(false, true)] - bool async1, - [Values(false, true)] - bool async2) - { - using (var stream = new BlockingMemoryStream()) - { - _mockStreamFactory.Setup(f => f.CreateStreamAsync(_endPoint, It.IsAny())).ReturnsAsync(stream); - await _subject.OpenAsync(OperationContext.NoTimeout); - _capturedEvents.Clear(); - - var encoderSelector = new ReplyMessageEncoderSelector(BsonDocumentSerializer.Instance); - - var receivedTask10 = async1 ? - _subject.ReceiveMessageAsync(OperationContext.NoTimeout, 10, encoderSelector, _messageEncoderSettings) : - Task.Run(() => _subject.ReceiveMessage(OperationContext.NoTimeout, 10, encoderSelector, _messageEncoderSettings)); - - var receivedTask11 = async2 ? - _subject.ReceiveMessageAsync(OperationContext.NoTimeout, 11, encoderSelector, _messageEncoderSettings) : - Task.Run(() => _subject.ReceiveMessage(OperationContext.NoTimeout, 11, encoderSelector, _messageEncoderSettings)); - - SpinWait.SpinUntil(() => _capturedEvents.Count >= 2, TimeSpan.FromSeconds(5)).Should().BeTrue(); - - var messageToReceive10 = MessageHelper.BuildReply(new BsonDocument("_id", 10), BsonDocumentSerializer.Instance, responseTo: 10); - var messageToReceive11 = MessageHelper.BuildReply(new BsonDocument("_id", 11), BsonDocumentSerializer.Instance, responseTo: 11); - MessageHelper.WriteResponsesToStream(stream, messageToReceive11, messageToReceive10); // out of order - - var received10 = await receivedTask10; - var received11 = await receivedTask11; - - var expected = MessageHelper.TranslateMessagesToBsonDocuments(new[] { messageToReceive10, messageToReceive11 }); - var actual = MessageHelper.TranslateMessagesToBsonDocuments(new[] { received10, received11 }); - - actual.Should().BeEquivalentTo(expected); - - _capturedEvents.Next().Should().BeOfType(); - _capturedEvents.Next().Should().BeOfType(); - _capturedEvents.Next().Should().BeOfType(); - _capturedEvents.Next().Should().BeOfType(); - _capturedEvents.Any().Should().BeFalse(); - } - } - [Fact] public async Task ReceiveMessage_should_not_produce_unobserved_task_exceptions_on_fail() { diff --git a/tests/MongoDB.Driver.Tests/Core/Misc/SemaphoreSlimRequestTests.cs b/tests/MongoDB.Driver.Tests/Core/Misc/SemaphoreSlimRequestTests.cs deleted file mode 100644 index 7dca0e28f9a..00000000000 --- a/tests/MongoDB.Driver.Tests/Core/Misc/SemaphoreSlimRequestTests.cs +++ /dev/null @@ -1,125 +0,0 @@ -/* Copyright 2015-present MongoDB Inc. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -using System; -using System.Threading; -using System.Threading.Tasks; -using FluentAssertions; -using Xunit; - -namespace MongoDB.Driver.Core.Misc -{ - public class SemaphoreSlimRequestTests - { - // public methods - [Fact] - public void constructor_should_initialize_instance_with_completed_task_when_semaphore_is_available() - { - var semaphore = new SemaphoreSlim(1); - - var result = new SemaphoreSlimRequest(semaphore, CancellationToken.None); - - result.Task.Status.Should().Be(TaskStatus.RanToCompletion); - semaphore.CurrentCount.Should().Be(0); - } - - [Fact] - public void constructor_should_initialize_instance_with_incompleted_task_when_semaphore_is_not_available() - { - var semaphore = new SemaphoreSlim(1); - semaphore.Wait(); - - var result = new SemaphoreSlimRequest(semaphore, CancellationToken.None); - - result.Task.IsCompleted.Should().BeFalse(); - semaphore.CurrentCount.Should().Be(0); - } - - [Fact] - public void constructor_should_throw_when_semaphore_is_null() - { - Action action = () => new SemaphoreSlimRequest(null, CancellationToken.None); - - action.ShouldThrow().And.ParamName.Should().Be("semaphore"); - } - - [Fact] - public void Dispose_should_cancel_pending_request() - { - var semaphore = new SemaphoreSlim(1); - semaphore.Wait(); - var subject = new SemaphoreSlimRequest(semaphore, CancellationToken.None); - - subject.Dispose(); - semaphore.Release(); - - subject.Task.Status.Should().Be(TaskStatus.Canceled); - semaphore.CurrentCount.Should().Be(1); - } - - [Fact] - public void Dispose_should_release_semaphore() - { - var semaphore = new SemaphoreSlim(1); - var subject = new SemaphoreSlimRequest(semaphore, CancellationToken.None); - - subject.Dispose(); - - semaphore.CurrentCount.Should().Be(1); - } - - [Fact] - public void Sempahore_should_not_be_released_when_cancellation_is_requested_after_semaphore_is_acquired() - { - var semaphore = new SemaphoreSlim(1); - using var cancellationTokenSource = new CancellationTokenSource(); - var subject = new SemaphoreSlimRequest(semaphore, cancellationTokenSource.Token); - - cancellationTokenSource.Cancel(); - - semaphore.CurrentCount.Should().Be(0); - } - - [Fact] - public void Task_should_be_cancelled_when_cancellationToken_requests_cancellation() - { - var semaphore = new SemaphoreSlim(1); - using var cancellationTokenSource = new CancellationTokenSource(); - semaphore.Wait(); - var subject = new SemaphoreSlimRequest(semaphore, cancellationTokenSource.Token); - - cancellationTokenSource.Cancel(); - SpinWait.SpinUntil(() => subject.Task.IsCompleted, TimeSpan.FromSeconds(5)).Should().BeTrue(); - semaphore.Release(); - - subject.Task.Status.Should().Be(TaskStatus.Canceled); - semaphore.CurrentCount.Should().Be(1); - } - - [Fact] - public void Task_should_be_completed_when_semaphore_becomes_available() - { - var semaphore = new SemaphoreSlim(1); - semaphore.Wait(); - var subject = new SemaphoreSlimRequest(semaphore, CancellationToken.None); - - semaphore.Release(); - SpinWait.SpinUntil(() => subject.Task.IsCompleted, TimeSpan.FromSeconds(5)).Should().BeTrue(); - - subject.Task.Status.Should().Be(TaskStatus.RanToCompletion); - semaphore.CurrentCount.Should().Be(0); - } - } -}