Skip to content

Commit 8e583a6

Browse files
committed
Use ResetAsync in AcquireAsync in MemcachedNode
1 parent 2875ebc commit 8e583a6

File tree

1 file changed

+22
-5
lines changed

1 file changed

+22
-5
lines changed

Enyim.Caching/Memcached/MemcachedNode.cs

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,7 @@ private class InternalPoolImpl : IDisposable
278278
private MemcachedNode ownerNode;
279279
private readonly EndPoint _endPoint;
280280
private readonly TimeSpan queueTimeout;
281+
private readonly TimeSpan _receiveTimeout;
281282
private SemaphoreSlim _semaphore;
282283

283284
private readonly object initLock = new Object();
@@ -293,11 +294,14 @@ internal InternalPoolImpl(
293294
throw new InvalidOperationException("maxItems must be larger than minItems", null);
294295
if (config.QueueTimeout < TimeSpan.Zero)
295296
throw new InvalidOperationException("queueTimeout must be >= TimeSpan.Zero", null);
297+
if (config.ReceiveTimeout < TimeSpan.Zero)
298+
throw new InvalidOperationException("ReceiveTimeout must be >= TimeSpan.Zero", null);
296299

297300
this.ownerNode = ownerNode;
298301
this.isAlive = true;
299302
_endPoint = ownerNode.EndPoint;
300303
this.queueTimeout = config.QueueTimeout;
304+
_receiveTimeout = config.ReceiveTimeout;
301305

302306
this.minItems = config.MinPoolSize;
303307
this.maxItems = config.MaxPoolSize;
@@ -356,7 +360,7 @@ internal async Task InitPoolAsync()
356360
{
357361
_freeItems.Push(await CreateSocketAsync());
358362
}
359-
catch(Exception ex)
363+
catch (Exception ex)
360364
{
361365
_logger.LogError(ex, $"Failed to put {nameof(PooledSocket)} {i} in Pool");
362366
}
@@ -561,9 +565,22 @@ public async Task<IPooledSocketResult> AcquireAsync()
561565

562566
try
563567
{
564-
retval.Reset();
568+
var resetTask = retval.ResetAsync();
565569

566-
message = "Socket was reset. " + retval.InstanceId;
570+
if (await Task.WhenAny(resetTask, Task.Delay(_receiveTimeout)) == resetTask)
571+
{
572+
await resetTask;
573+
}
574+
else
575+
{
576+
message = "Timeout to reset an acquired socket. InstanceId " + retval.InstanceId;
577+
_logger.LogError(message);
578+
MarkAsDead();
579+
result.Fail(message);
580+
return result;
581+
}
582+
583+
message = "Socket was reset. InstanceId " + retval.InstanceId;
567584
if (_isDebugEnabled) _logger.LogDebug(message);
568585

569586
result.Pass(message);
@@ -796,7 +813,7 @@ protected internal virtual async Task<PooledSocket> CreateSocketAsync()
796813
}
797814
catch (Exception ex)
798815
{
799-
var endPointStr = _endPoint.ToString().Replace("Unspecified/", string.Empty);
816+
var endPointStr = _endPoint.ToString().Replace("Unspecified/", string.Empty);
800817
_logger.LogError(ex, $"Failed to {nameof(CreateSocketAsync)} to {endPointStr}");
801818
throw;
802819
}
@@ -874,7 +891,7 @@ protected virtual async Task<IPooledSocketResult> ExecuteOperationAsync(IOperati
874891
_logger.LogDebug("pooledSocket.WriteAsync...");
875892

876893
var writeSocketTask = pooledSocket.WriteAsync(b);
877-
if(await Task.WhenAny(writeSocketTask, Task.Delay(_config.ConnectionTimeout)) != writeSocketTask)
894+
if (await Task.WhenAny(writeSocketTask, Task.Delay(_config.ConnectionTimeout)) != writeSocketTask)
878895
{
879896
result.Fail("Timeout to pooledSocket.WriteAsync");
880897
return result;

0 commit comments

Comments
 (0)