Skip to content

Commit 5ebeb36

Browse files
author
Oleksandr Poliakov
committed
CSHARP-3984: Remove BinaryConnection.DropBox
1 parent c96b803 commit 5ebeb36

File tree

4 files changed

+46
-356
lines changed

4 files changed

+46
-356
lines changed

src/MongoDB.Driver/Core/Connections/BinaryConnection.cs

Lines changed: 45 additions & 185 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
using System;
1717
using System.Buffers.Binary;
18-
using System.Collections.Concurrent;
1918
using System.Diagnostics;
2019
using System.IO;
2120
using System.Net;
@@ -46,15 +45,12 @@ internal sealed class BinaryConnection : IConnection
4645
private ConnectionInitializerContext _connectionInitializerContext;
4746
private EndPoint _endPoint;
4847
private ConnectionDescription _description;
49-
private readonly Dropbox _dropbox = new Dropbox();
5048
private bool _failedEventHasBeenRaised;
5149
private DateTime _lastUsedAtUtc;
5250
private DateTime _openedAtUtc;
5351
private readonly object _openLock = new object();
5452
private Task _openTask;
55-
private readonly SemaphoreSlim _receiveLock;
5653
private CompressorType? _sendCompressorType;
57-
private readonly SemaphoreSlim _sendLock;
5854
private readonly ConnectionSettings _settings;
5955
private readonly InterlockedInt32 _state;
6056
private Stream _stream;
@@ -79,8 +75,6 @@ public BinaryConnection(
7975
Ensure.IsNotNull(eventSubscriber, nameof(eventSubscriber));
8076

8177
_connectionId = new ConnectionId(serverId, settings.ConnectionIdLocalValueProvider());
82-
_receiveLock = new SemaphoreSlim(1);
83-
_sendLock = new SemaphoreSlim(1);
8478
_state = new InterlockedInt32(State.Initial);
8579

8680
_compressorSource = new CompressorSource(settings.Compressors);
@@ -173,9 +167,6 @@ private void Dispose(bool disposing)
173167
_eventLogger.LogAndPublish(new ConnectionClosingEvent(_connectionId, EventContext.OperationId));
174168

175169
var stopwatch = Stopwatch.StartNew();
176-
_receiveLock.Dispose();
177-
_sendLock.Dispose();
178-
179170
if (_stream != null)
180171
{
181172
try
@@ -356,50 +347,6 @@ private IByteBuffer ReceiveBuffer(OperationContext operationContext)
356347
}
357348
}
358349

359-
private IByteBuffer ReceiveBuffer(OperationContext operationContext, int responseTo)
360-
{
361-
using (var receiveLockRequest = new SemaphoreSlimRequest(_receiveLock, operationContext.RemainingTimeout, operationContext.CancellationToken))
362-
{
363-
var messageTask = _dropbox.GetMessageAsync(responseTo);
364-
try
365-
{
366-
Task.WaitAny(messageTask, receiveLockRequest.Task);
367-
if (messageTask.IsCompleted)
368-
{
369-
return _dropbox.RemoveMessage(responseTo); // also propagates exception if any
370-
}
371-
372-
receiveLockRequest.Task.GetAwaiter().GetResult(); // propagate exceptions
373-
while (true)
374-
{
375-
try
376-
{
377-
var buffer = ReceiveBuffer(operationContext);
378-
_dropbox.AddMessage(buffer);
379-
}
380-
catch (Exception ex)
381-
{
382-
_dropbox.AddException(ex);
383-
}
384-
385-
if (messageTask.IsCompleted)
386-
{
387-
return _dropbox.RemoveMessage(responseTo); // also propagates exception if any
388-
}
389-
390-
operationContext.ThrowIfTimedOutOrCanceled();
391-
}
392-
}
393-
catch
394-
{
395-
var ignored = messageTask.ContinueWith(
396-
t => { _dropbox.RemoveMessage(responseTo).Dispose(); },
397-
TaskContinuationOptions.OnlyOnRanToCompletion);
398-
throw;
399-
}
400-
}
401-
}
402-
403350
private async Task<IByteBuffer> ReceiveBufferAsync(OperationContext operationContext)
404351
{
405352
try
@@ -425,50 +372,6 @@ private async Task<IByteBuffer> ReceiveBufferAsync(OperationContext operationCon
425372
}
426373
}
427374

428-
private async Task<IByteBuffer> ReceiveBufferAsync(OperationContext operationContext, int responseTo)
429-
{
430-
using (var receiveLockRequest = new SemaphoreSlimRequest(_receiveLock, operationContext.RemainingTimeout, operationContext.CancellationToken))
431-
{
432-
var messageTask = _dropbox.GetMessageAsync(responseTo);
433-
try
434-
{
435-
await Task.WhenAny(messageTask, receiveLockRequest.Task).ConfigureAwait(false);
436-
if (messageTask.IsCompleted)
437-
{
438-
return _dropbox.RemoveMessage(responseTo); // also propagates exception if any
439-
}
440-
441-
await receiveLockRequest.Task.ConfigureAwait(false); // propagate exceptions
442-
while (true)
443-
{
444-
try
445-
{
446-
var buffer = await ReceiveBufferAsync(operationContext).ConfigureAwait(false);
447-
_dropbox.AddMessage(buffer);
448-
}
449-
catch (Exception ex)
450-
{
451-
_dropbox.AddException(ex);
452-
}
453-
454-
if (messageTask.IsCompleted)
455-
{
456-
return _dropbox.RemoveMessage(responseTo); // also propagates exception if any
457-
}
458-
459-
operationContext.ThrowIfTimedOutOrCanceled();
460-
}
461-
}
462-
catch
463-
{
464-
var ignored = messageTask.ContinueWith(
465-
t => { _dropbox.RemoveMessage(responseTo).Dispose(); },
466-
TaskContinuationOptions.OnlyOnRanToCompletion);
467-
throw;
468-
}
469-
}
470-
}
471-
472375
public ResponseMessage ReceiveMessage(
473376
OperationContext operationContext,
474377
int responseTo,
@@ -482,11 +385,19 @@ public ResponseMessage ReceiveMessage(
482385
try
483386
{
484387
helper.ReceivingMessage();
485-
using (var buffer = ReceiveBuffer(operationContext, responseTo))
388+
while (true)
486389
{
487-
var message = helper.DecodeMessage(operationContext, buffer, encoderSelector);
488-
helper.ReceivedMessage(buffer, message);
489-
return message;
390+
using (var buffer = ReceiveBuffer(operationContext))
391+
{
392+
if (responseTo != GetResponseTo(buffer))
393+
{
394+
continue;
395+
}
396+
397+
var message = helper.DecodeMessage(operationContext, buffer, encoderSelector);
398+
helper.ReceivedMessage(buffer, message);
399+
return message;
400+
}
490401
}
491402
}
492403
catch (Exception ex)
@@ -497,7 +408,9 @@ public ResponseMessage ReceiveMessage(
497408
}
498409
}
499410

500-
public async Task<ResponseMessage> ReceiveMessageAsync(OperationContext operationContext, int responseTo,
411+
public async Task<ResponseMessage> ReceiveMessageAsync(
412+
OperationContext operationContext,
413+
int responseTo,
501414
IMessageEncoderSelector encoderSelector,
502415
MessageEncoderSettings messageEncoderSettings)
503416
{
@@ -508,11 +421,19 @@ public async Task<ResponseMessage> ReceiveMessageAsync(OperationContext operatio
508421
try
509422
{
510423
helper.ReceivingMessage();
511-
using (var buffer = await ReceiveBufferAsync(operationContext, responseTo).ConfigureAwait(false))
424+
while (true)
512425
{
513-
var message = helper.DecodeMessage(operationContext, buffer, encoderSelector);
514-
helper.ReceivedMessage(buffer, message);
515-
return message;
426+
using (var buffer = await ReceiveBufferAsync(operationContext).ConfigureAwait(false))
427+
{
428+
if (responseTo != GetResponseTo(buffer))
429+
{
430+
continue;
431+
}
432+
433+
var message = helper.DecodeMessage(operationContext, buffer, encoderSelector);
434+
helper.ReceivedMessage(buffer, message);
435+
return message;
436+
}
516437
}
517438
}
518439
catch (Exception ex)
@@ -523,59 +444,39 @@ public async Task<ResponseMessage> ReceiveMessageAsync(OperationContext operatio
523444
}
524445
}
525446

447+
private int GetResponseTo(IByteBuffer message)
448+
{
449+
var backingBytes = message.AccessBackingBytes(8);
450+
return BitConverter.ToInt32(backingBytes.Array, backingBytes.Offset);
451+
}
452+
526453
private void SendBuffer(OperationContext operationContext, IByteBuffer buffer)
527454
{
528-
_sendLock.Wait(operationContext.RemainingTimeout, operationContext.CancellationToken);
529455
try
530456
{
531-
if (_state.Value == State.Failed)
532-
{
533-
throw new MongoConnectionClosedException(_connectionId);
534-
}
535-
536-
try
537-
{
538-
_stream.WriteBytes(operationContext, buffer, 0, buffer.Length);
539-
_lastUsedAtUtc = DateTime.UtcNow;
540-
}
541-
catch (Exception ex)
542-
{
543-
var wrappedException = WrapExceptionIfRequired(ex, "sending a message to the server");
544-
ConnectionFailed(wrappedException ?? ex);
545-
if (wrappedException == null) { throw; } else { throw wrappedException; }
546-
}
457+
_stream.WriteBytes(operationContext, buffer, 0, buffer.Length);
458+
_lastUsedAtUtc = DateTime.UtcNow;
547459
}
548-
finally
460+
catch (Exception ex)
549461
{
550-
_sendLock.Release();
462+
var wrappedException = WrapExceptionIfRequired(ex, "sending a message to the server");
463+
ConnectionFailed(wrappedException ?? ex);
464+
if (wrappedException == null) { throw; } else { throw wrappedException; }
551465
}
552466
}
553467

554468
private async Task SendBufferAsync(OperationContext operationContext, IByteBuffer buffer)
555469
{
556-
await _sendLock.WaitAsync(operationContext.RemainingTimeout, operationContext.CancellationToken).ConfigureAwait(false);
557470
try
558471
{
559-
if (_state.Value == State.Failed)
560-
{
561-
throw new MongoConnectionClosedException(_connectionId);
562-
}
563-
564-
try
565-
{
566-
await _stream.WriteBytesAsync(operationContext, buffer, 0, buffer.Length).ConfigureAwait(false);
567-
_lastUsedAtUtc = DateTime.UtcNow;
568-
}
569-
catch (Exception ex)
570-
{
571-
var wrappedException = WrapExceptionIfRequired(ex, "sending a message to the server");
572-
ConnectionFailed(wrappedException ?? ex);
573-
if (wrappedException == null) { throw; } else { throw wrappedException; }
574-
}
472+
await _stream.WriteBytesAsync(operationContext, buffer, 0, buffer.Length).ConfigureAwait(false);
473+
_lastUsedAtUtc = DateTime.UtcNow;
575474
}
576-
finally
475+
catch (Exception ex)
577476
{
578-
_sendLock.Release();
477+
var wrappedException = WrapExceptionIfRequired(ex, "sending a message to the server");
478+
ConnectionFailed(wrappedException ?? ex);
479+
if (wrappedException == null) { throw; } else { throw wrappedException; }
579480
}
580481
}
581482

@@ -770,47 +671,6 @@ private void ThrowOperationCanceledExceptionIfRequired(Exception exception)
770671
}
771672

772673
// nested classes
773-
private class Dropbox
774-
{
775-
private readonly ConcurrentDictionary<int, TaskCompletionSource<IByteBuffer>> _messages = new ConcurrentDictionary<int, TaskCompletionSource<IByteBuffer>>();
776-
777-
// public methods
778-
public void AddException(Exception exception)
779-
{
780-
foreach (var taskCompletionSource in _messages.Values)
781-
{
782-
taskCompletionSource.TrySetException(exception); // has no effect on already completed tasks
783-
}
784-
}
785-
786-
public void AddMessage(IByteBuffer message)
787-
{
788-
var responseTo = GetResponseTo(message);
789-
var tcs = _messages.GetOrAdd(responseTo, x => new TaskCompletionSource<IByteBuffer>());
790-
tcs.TrySetResult(message);
791-
}
792-
793-
public Task<IByteBuffer> GetMessageAsync(int responseTo)
794-
{
795-
var tcs = _messages.GetOrAdd(responseTo, _ => new TaskCompletionSource<IByteBuffer>());
796-
return tcs.Task;
797-
}
798-
799-
public IByteBuffer RemoveMessage(int responseTo)
800-
{
801-
TaskCompletionSource<IByteBuffer> tcs;
802-
_messages.TryRemove(responseTo, out tcs);
803-
return tcs.Task.GetAwaiter().GetResult(); // RemoveMessage is only called when Task is complete
804-
}
805-
806-
// private methods
807-
private int GetResponseTo(IByteBuffer message)
808-
{
809-
var backingBytes = message.AccessBackingBytes(8);
810-
return BinaryPrimitives.ReadInt32LittleEndian(new ReadOnlySpan<byte>(backingBytes.Array, backingBytes.Offset, 4));
811-
}
812-
}
813-
814674
private class OpenConnectionHelper
815675
{
816676
private readonly BinaryConnection _connection;

src/MongoDB.Driver/Core/Misc/SemaphoreSlimRequest.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ namespace MongoDB.Driver.Core.Misc
2222
/// <summary>
2323
/// Represents a tentative request to acquire a SemaphoreSlim.
2424
/// </summary>
25+
[Obsolete("SemaphoreSlimRequest is deprecated and will be removed in future release")]
2526
public sealed class SemaphoreSlimRequest : IDisposable
2627
{
2728
// private fields

tests/MongoDB.Driver.Tests/Core/Connections/BinaryConnectionTests.cs

Lines changed: 0 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -489,52 +489,6 @@ public async Task ReceiveMessage_should_complete_when_reply_is_not_already_on_th
489489
}
490490
}
491491

492-
[Theory]
493-
[ParameterAttributeData]
494-
public async Task ReceiveMessage_should_handle_out_of_order_replies(
495-
[Values(false, true)]
496-
bool async1,
497-
[Values(false, true)]
498-
bool async2)
499-
{
500-
using (var stream = new BlockingMemoryStream())
501-
{
502-
_mockStreamFactory.Setup(f => f.CreateStreamAsync(_endPoint, It.IsAny<CancellationToken>())).ReturnsAsync(stream);
503-
await _subject.OpenAsync(OperationContext.NoTimeout);
504-
_capturedEvents.Clear();
505-
506-
var encoderSelector = new ReplyMessageEncoderSelector<BsonDocument>(BsonDocumentSerializer.Instance);
507-
508-
var receivedTask10 = async1 ?
509-
_subject.ReceiveMessageAsync(OperationContext.NoTimeout, 10, encoderSelector, _messageEncoderSettings) :
510-
Task.Run(() => _subject.ReceiveMessage(OperationContext.NoTimeout, 10, encoderSelector, _messageEncoderSettings));
511-
512-
var receivedTask11 = async2 ?
513-
_subject.ReceiveMessageAsync(OperationContext.NoTimeout, 11, encoderSelector, _messageEncoderSettings) :
514-
Task.Run(() => _subject.ReceiveMessage(OperationContext.NoTimeout, 11, encoderSelector, _messageEncoderSettings));
515-
516-
SpinWait.SpinUntil(() => _capturedEvents.Count >= 2, TimeSpan.FromSeconds(5)).Should().BeTrue();
517-
518-
var messageToReceive10 = MessageHelper.BuildReply<BsonDocument>(new BsonDocument("_id", 10), BsonDocumentSerializer.Instance, responseTo: 10);
519-
var messageToReceive11 = MessageHelper.BuildReply<BsonDocument>(new BsonDocument("_id", 11), BsonDocumentSerializer.Instance, responseTo: 11);
520-
MessageHelper.WriteResponsesToStream(stream, messageToReceive11, messageToReceive10); // out of order
521-
522-
var received10 = await receivedTask10;
523-
var received11 = await receivedTask11;
524-
525-
var expected = MessageHelper.TranslateMessagesToBsonDocuments(new[] { messageToReceive10, messageToReceive11 });
526-
var actual = MessageHelper.TranslateMessagesToBsonDocuments(new[] { received10, received11 });
527-
528-
actual.Should().BeEquivalentTo(expected);
529-
530-
_capturedEvents.Next().Should().BeOfType<ConnectionReceivingMessageEvent>();
531-
_capturedEvents.Next().Should().BeOfType<ConnectionReceivingMessageEvent>();
532-
_capturedEvents.Next().Should().BeOfType<ConnectionReceivedMessageEvent>();
533-
_capturedEvents.Next().Should().BeOfType<ConnectionReceivedMessageEvent>();
534-
_capturedEvents.Any().Should().BeFalse();
535-
}
536-
}
537-
538492
[Fact]
539493
public async Task ReceiveMessage_should_not_produce_unobserved_task_exceptions_on_fail()
540494
{

0 commit comments

Comments
 (0)