55namespace EventHook . Helpers
66{
77 /// <summary>
8- /// A concurrent queue facilitating async without locking
8+ /// A concurrent queue facilitating async dequeue
9+ /// Since our consumer is always single threaded no locking is needed
910 /// </summary>
1011 /// <typeparam name="T"></typeparam>
1112 internal class AsyncQueue < T >
@@ -19,66 +20,54 @@ internal AsyncQueue(CancellationToken taskCancellationToken)
1920 /// <summary>
2021 /// Backing queue
2122 /// </summary>
22- ConcurrentQueue < TaskResult > queue = new ConcurrentQueue < TaskResult > ( ) ;
23+ ConcurrentQueue < T > queue = new ConcurrentQueue < T > ( ) ;
2324
2425 /// <summary>
2526 /// Keeps a list of pending Dequeue tasks in FIFO order
2627 /// </summary>
27- ConcurrentQueue < TaskCompletionSource < TaskResult > > dequeueTasks
28- = new ConcurrentQueue < TaskCompletionSource < TaskResult > > ( ) ;
28+ ConcurrentQueue < TaskCompletionSource < T > > dequeueTasks
29+ = new ConcurrentQueue < TaskCompletionSource < T > > ( ) ;
2930
31+ /// <summary>
32+ /// Assumes a single threaded producer!
33+ /// </summary>
34+ /// <param name="value"></param>
3035 internal void Enqueue ( T value )
3136 {
32- queue . Enqueue ( new TaskResult ( ) { success = true , Data = value } ) ;
37+ queue . Enqueue ( value ) ;
3338
3439 //Set the earlist waiting Dequeue task
35- TaskCompletionSource < TaskResult > task ;
36- if ( dequeueTasks . TryDequeue ( out task ) )
40+ TaskCompletionSource < T > task ;
41+
42+ if ( dequeueTasks . TryDequeue ( out task ) )
3743 {
38- TaskResult result ;
39- //if dequeue failed it means another Task picked up the data
40- //set the result to false for this Task so that it will be retried
41- //otherwise return the result
42- if ( queue . TryDequeue ( out result ) )
43- {
44- task . SetResult ( result ) ;
45- }
46- else
47- {
48- task . SetResult ( new TaskResult ( ) { success = false } ) ;
49- }
50-
44+ //return the result
45+ T result ;
46+ queue . TryDequeue ( out result ) ;
47+ task . SetResult ( result ) ;
5148 }
5249
50+
5351 }
5452
53+ /// <summary>
54+ /// Assumes a single threaded consumer!
55+ /// </summary>
56+ /// <returns></returns>
5557 internal async Task < T > DequeueAsync ( )
5658 {
57- TaskResult result ;
59+ T result ;
60+
5861 queue . TryDequeue ( out result ) ;
5962
60- //try until we get a result
61- while ( result == null || ! result . success )
62- {
63- var tcs = new TaskCompletionSource < TaskResult > ( ) ;
64- //cancel the task if cancellation token was invoked
65- //will throw exception on await below if task was running when cancelled
66- taskCancellationToken . Register ( ( ) => tcs . TrySetCanceled ( ) ) ;
63+ var tcs = new TaskCompletionSource < T > ( ) ;
64+ taskCancellationToken . Register ( ( ) => tcs . TrySetCanceled ( ) ) ;
6765
68- dequeueTasks . Enqueue ( tcs ) ;
69- result = await tcs . Task ;
70- }
66+ dequeueTasks . Enqueue ( tcs ) ;
67+ result = await tcs . Task ;
7168
72- return result . Data ;
69+ return result ;
7370 }
7471
75- /// <summary>
76- /// To keep the dequeue result status
77- /// </summary>
78- internal class TaskResult
79- {
80- internal bool success { get ; set ; }
81- internal T Data { get ; set ; }
82- }
8372 }
8473}
0 commit comments