@@ -23,10 +23,9 @@ internal AsyncQueue(CancellationToken taskCancellationToken)
2323 ConcurrentQueue < T > queue = new ConcurrentQueue < T > ( ) ;
2424
2525 /// <summary>
26- /// Keeps a list of pending Dequeue tasks in FIFO order
26+ /// Keeps any pending Dequeue task to wake up once data arrives
2727 /// </summary>
28- ConcurrentQueue < TaskCompletionSource < T > > dequeueTasks
29- = new ConcurrentQueue < TaskCompletionSource < T > > ( ) ;
28+ TaskCompletionSource < T > dequeueTask ;
3029
3130 /// <summary>
3231 /// Assumes a single threaded producer!
@@ -36,18 +35,15 @@ internal void Enqueue(T value)
3635 {
3736 queue . Enqueue ( value ) ;
3837
39- //Set the earlist waiting Dequeue task
40- TaskCompletionSource < T > task ;
41-
42- if ( dequeueTasks . TryDequeue ( out task ) )
38+ //wake up the dequeue task with result
39+ if ( dequeueTask != null
40+ && ! dequeueTask . Task . IsCompleted )
4341 {
44- //return the result
4542 T result ;
4643 queue . TryDequeue ( out result ) ;
47- task . SetResult ( result ) ;
44+ dequeueTask . SetResult ( result ) ;
4845 }
4946
50-
5147 }
5248
5349 /// <summary>
@@ -57,14 +53,17 @@ internal void Enqueue(T value)
5753 internal async Task < T > DequeueAsync ( )
5854 {
5955 T result ;
60-
6156 queue . TryDequeue ( out result ) ;
6257
63- var tcs = new TaskCompletionSource < T > ( ) ;
64- taskCancellationToken . Register ( ( ) => tcs . TrySetCanceled ( ) ) ;
58+ if ( result != null )
59+ {
60+ return result ;
61+ }
6562
66- dequeueTasks . Enqueue ( tcs ) ;
67- result = await tcs . Task ;
63+ dequeueTask = new TaskCompletionSource < T > ( ) ;
64+ taskCancellationToken . Register ( ( ) => dequeueTask . TrySetCanceled ( ) ) ;
65+ result = await dequeueTask . Task ;
66+ dequeueTask = null ;
6867
6968 return result ;
7069 }
0 commit comments