Skip to content

Commit 856948f

Browse files
authored
Merge pull request #152 from cnblogs/implement-reset-async
Implement ResetAsync
2 parents e6487fc + 8e583a6 commit 856948f

File tree

2 files changed

+48
-11
lines changed

2 files changed

+48
-11
lines changed

Enyim.Caching/Memcached/MemcachedNode.cs

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,7 @@ private class InternalPoolImpl : IDisposable
278278
private MemcachedNode ownerNode;
279279
private readonly EndPoint _endPoint;
280280
private readonly TimeSpan queueTimeout;
281+
private readonly TimeSpan _receiveTimeout;
281282
private SemaphoreSlim _semaphore;
282283

283284
private readonly object initLock = new Object();
@@ -293,11 +294,14 @@ internal InternalPoolImpl(
293294
throw new InvalidOperationException("maxItems must be larger than minItems", null);
294295
if (config.QueueTimeout < TimeSpan.Zero)
295296
throw new InvalidOperationException("queueTimeout must be >= TimeSpan.Zero", null);
297+
if (config.ReceiveTimeout < TimeSpan.Zero)
298+
throw new InvalidOperationException("ReceiveTimeout must be >= TimeSpan.Zero", null);
296299

297300
this.ownerNode = ownerNode;
298301
this.isAlive = true;
299302
_endPoint = ownerNode.EndPoint;
300303
this.queueTimeout = config.QueueTimeout;
304+
_receiveTimeout = config.ReceiveTimeout;
301305

302306
this.minItems = config.MinPoolSize;
303307
this.maxItems = config.MaxPoolSize;
@@ -356,7 +360,7 @@ internal async Task InitPoolAsync()
356360
{
357361
_freeItems.Push(await CreateSocketAsync());
358362
}
359-
catch(Exception ex)
363+
catch (Exception ex)
360364
{
361365
_logger.LogError(ex, $"Failed to put {nameof(PooledSocket)} {i} in Pool");
362366
}
@@ -561,9 +565,22 @@ public async Task<IPooledSocketResult> AcquireAsync()
561565

562566
try
563567
{
564-
retval.Reset();
568+
var resetTask = retval.ResetAsync();
565569

566-
message = "Socket was reset. " + retval.InstanceId;
570+
if (await Task.WhenAny(resetTask, Task.Delay(_receiveTimeout)) == resetTask)
571+
{
572+
await resetTask;
573+
}
574+
else
575+
{
576+
message = "Timeout to reset an acquired socket. InstanceId " + retval.InstanceId;
577+
_logger.LogError(message);
578+
MarkAsDead();
579+
result.Fail(message);
580+
return result;
581+
}
582+
583+
message = "Socket was reset. InstanceId " + retval.InstanceId;
567584
if (_isDebugEnabled) _logger.LogDebug(message);
568585

569586
result.Pass(message);
@@ -796,7 +813,7 @@ protected internal virtual async Task<PooledSocket> CreateSocketAsync()
796813
}
797814
catch (Exception ex)
798815
{
799-
var endPointStr = _endPoint.ToString().Replace("Unspecified/", string.Empty);
816+
var endPointStr = _endPoint.ToString().Replace("Unspecified/", string.Empty);
800817
_logger.LogError(ex, $"Failed to {nameof(CreateSocketAsync)} to {endPointStr}");
801818
throw;
802819
}
@@ -874,7 +891,7 @@ protected virtual async Task<IPooledSocketResult> ExecuteOperationAsync(IOperati
874891
_logger.LogDebug("pooledSocket.WriteAsync...");
875892

876893
var writeSocketTask = pooledSocket.WriteAsync(b);
877-
if(await Task.WhenAny(writeSocketTask, Task.Delay(_config.ConnectionTimeout)) != writeSocketTask)
894+
if (await Task.WhenAny(writeSocketTask, Task.Delay(_config.ConnectionTimeout)) != writeSocketTask)
878895
{
879896
result.Fail("Timeout to pooledSocket.WriteAsync");
880897
return result;

Enyim.Caching/Memcached/PooledSocket.cs

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -158,8 +158,7 @@ public int Available
158158

159159
public void Reset()
160160
{
161-
// discard any buffered data
162-
_inputStream.Flush();
161+
// _inputStream.Flush();
163162

164163
int available = _socket.Available;
165164

@@ -168,18 +167,39 @@ public void Reset()
168167
if (_logger.IsEnabled(LogLevel.Warning))
169168
_logger.LogWarning(
170169
"Socket bound to {0} has {1} unread data! This is probably a bug in the code. InstanceID was {2}.",
171-
_socket.RemoteEndPoint, available, this.InstanceId);
170+
_socket.RemoteEndPoint, available, InstanceId);
172171

173172
byte[] data = new byte[available];
174173

175-
this.Read(data, 0, available);
174+
Read(data, 0, available);
175+
}
176176

177+
if (_logger.IsEnabled(LogLevel.Debug))
178+
_logger.LogDebug("Socket {0} was reset", InstanceId);
179+
}
180+
181+
public async Task ResetAsync()
182+
{
183+
// await _inputStream.FlushAsync();
184+
185+
int available = _socket.Available;
186+
187+
if (available > 0)
188+
{
177189
if (_logger.IsEnabled(LogLevel.Warning))
178-
_logger.LogWarning(Encoding.ASCII.GetString(data));
190+
{
191+
_logger.LogWarning(
192+
"Socket bound to {0} has {1} unread data! This is probably a bug in the code. InstanceID was {2}.",
193+
_socket.RemoteEndPoint, available, InstanceId);
194+
}
195+
196+
byte[] data = new byte[available];
197+
198+
await ReadAsync(data, 0, available);
179199
}
180200

181201
if (_logger.IsEnabled(LogLevel.Debug))
182-
_logger.LogDebug("Socket {0} was reset", this.InstanceId);
202+
_logger.LogDebug("Socket {0} was reset", InstanceId);
183203
}
184204

185205
/// <summary>

0 commit comments

Comments
 (0)