1212//
1313//===----------------------------------------------------------------------===//
1414
15+ import  NIOCore
1516import  Testing
1617
1718@testable   import  AWSLambdaRuntime
@@ -165,7 +166,7 @@ struct PoolTests {
165166        let  pool  =  LambdaHTTPServer . Pool < String > ( ) 
166167
167168        // Create two tasks that will both wait for elements to be available
168-         await  #expect( throws:  LambdaHTTPServer . Pool < Swift . String > . PoolError. self)  { 
169+         let   error   =   await  #expect( throws:  LambdaHTTPServer . Pool < String > . PoolError. self)  { 
169170            try await  withThrowingTaskGroup ( of:  Void . self)  {  group in 
170171
171172                // one of the two task will throw a PoolError
@@ -184,6 +185,329 @@ struct PoolTests {
184185                try await  group. waitForAll ( ) 
185186            } 
186187        } 
188+ 
189+         // Verify it's the correct error cause
190+         if  case . nextCalledTwice =  error? . cause { 
191+             // This is the expected error
192+         }  else  { 
193+             Issue . record ( " Expected nextCalledTwice error, got:  \( String ( describing:  error? . cause) ) " ) 
194+         } 
195+     } 
196+ 
197+     // MARK: - Invariant Tests for RequestId-specific functionality
198+ 
199+     @Test  
200+     @available ( LambdaSwift 2 . 0 ,  * )  
201+     func  testRequestIdSpecificNext( )  async  throws  { 
202+         let  pool  =  LambdaHTTPServer . Pool < LambdaHTTPServer . LocalServerResponse > ( ) 
203+ 
204+         // Push responses with different requestIds
205+         pool. push ( LambdaHTTPServer . LocalServerResponse ( id:  " req1 " ,  body:  ByteBuffer ( string:  " data1 " ) ) ) 
206+         pool. push ( LambdaHTTPServer . LocalServerResponse ( id:  " req2 " ,  body:  ByteBuffer ( string:  " data2 " ) ) ) 
207+         pool. push ( LambdaHTTPServer . LocalServerResponse ( id:  " req1 " ,  body:  ByteBuffer ( string:  " data3 " ) ) ) 
208+ 
209+         // Get specific responses
210+         let  response1  =  try await  pool. next ( for:  " req1 " ) 
211+         #expect( response1. requestId ==  " req1 " ) 
212+         #expect( String ( buffer:  response1. body!)  ==  " data1 " ) 
213+ 
214+         let  response2  =  try await  pool. next ( for:  " req2 " ) 
215+         #expect( response2. requestId ==  " req2 " ) 
216+         #expect( String ( buffer:  response2. body!)  ==  " data2 " ) 
217+ 
218+         let  response3  =  try await  pool. next ( for:  " req1 " ) 
219+         #expect( response3. requestId ==  " req1 " ) 
220+         #expect( String ( buffer:  response3. body!)  ==  " data3 " ) 
221+     } 
222+ 
223+     @Test  
224+     @available ( LambdaSwift 2 . 0 ,  * )  
225+     func  testStreamingResponsesWithSameRequestId( )  async  throws  { 
226+         let  pool  =  LambdaHTTPServer . Pool < LambdaHTTPServer . LocalServerResponse > ( ) 
227+         let  requestId  =  " streaming-req " 
228+ 
229+         let  chunks  =  try await  withThrowingTaskGroup ( of:  [ String ] . self)  {  group in 
230+             // Start consumer task
231+             group. addTask  { 
232+                 var  chunks :  [ String ]  =  [ ] 
233+                 var  isComplete  =  false 
234+ 
235+                 while  !isComplete { 
236+                     let  response  =  try await  pool. next ( for:  requestId) 
237+                     if  let  body =  response. body { 
238+                         chunks. append ( String ( buffer:  body) ) 
239+                     } 
240+                     if  response. final { 
241+                         isComplete =  true 
242+                     } 
243+                 } 
244+                 return  chunks
245+             } 
246+ 
247+             // Start producer task
248+             group. addTask  { 
249+                 // Give consumer time to start waiting
250+                 try await  Task . sleep ( nanoseconds:  10_000_000 )   // 0.01 seconds
251+ 
252+                 // Push multiple chunks for the same requestId
253+                 pool. push ( 
254+                     LambdaHTTPServer . LocalServerResponse ( 
255+                         id:  requestId, 
256+                         body:  ByteBuffer ( string:  " chunk1 " ) , 
257+                         final:  false 
258+                     ) 
259+                 ) 
260+                 pool. push ( 
261+                     LambdaHTTPServer . LocalServerResponse ( 
262+                         id:  requestId, 
263+                         body:  ByteBuffer ( string:  " chunk2 " ) , 
264+                         final:  false 
265+                     ) 
266+                 ) 
267+                 pool. push ( 
268+                     LambdaHTTPServer . LocalServerResponse ( id:  requestId,  body:  ByteBuffer ( string:  " chunk3 " ) ,  final:  true ) 
269+                 ) 
270+ 
271+                 return  [ ]   // Producer doesn't return chunks
272+             } 
273+ 
274+             // Wait for consumer to complete and return its result
275+             for  try await  result  in  group { 
276+                 if  !result. isEmpty { 
277+                     group. cancelAll ( ) 
278+                     return  result
279+                 } 
280+             } 
281+             return  [ ] 
282+         } 
283+ 
284+         #expect( chunks ==  [ " chunk1 " ,  " chunk2 " ,  " chunk3 " ] ) 
285+     } 
286+ 
287+     @Test  
288+     @available ( LambdaSwift 2 . 0 ,  * )  
289+     func  testMixedWaitingModesError( )  async  throws  { 
290+         let  pool  =  LambdaHTTPServer . Pool < LambdaHTTPServer . LocalServerResponse > ( ) 
291+ 
292+         let  error  =  await  #expect( throws:  LambdaHTTPServer . Pool < LambdaHTTPServer . LocalServerResponse > . PoolError. self)  { 
293+             try await  withThrowingTaskGroup ( of:  Void . self)  {  group in 
294+                 // Start a FIFO consumer
295+                 group. addTask  { 
296+                     for  try await  _  in  pool { 
297+                         // This should block waiting for any item
298+                     } 
299+                 } 
300+ 
301+                 // Start a requestId-specific consumer after a delay
302+                 group. addTask  { 
303+                     // Give FIFO task time to start waiting
304+                     try await  Task . sleep ( nanoseconds:  10_000_000 )   // 0.01 seconds
305+ 
306+                     // Try to use requestId-specific next - should fail with mixedWaitingModes
307+                     _ =  try await  pool. next ( for:  " req1 " ) 
308+                 } 
309+ 
310+                 // Wait for the first task to complete (which should be the error)
311+                 try await  group. next ( ) 
312+                 group. cancelAll ( ) 
313+             } 
314+         } 
315+ 
316+         // Verify it's the correct error cause
317+         if  case . mixedWaitingModes =  error? . cause { 
318+             // This is the expected error
319+         }  else  { 
320+             Issue . record ( " Expected mixedWaitingModes error, got:  \( String ( describing:  error? . cause) ) " ) 
321+         } 
322+     } 
323+ 
324+     @Test  
325+     @available ( LambdaSwift 2 . 0 ,  * )  
326+     func  testMixedWaitingModesErrorReverse( )  async  throws  { 
327+         let  pool  =  LambdaHTTPServer . Pool < LambdaHTTPServer . LocalServerResponse > ( ) 
328+ 
329+         let  error  =  await  #expect( throws:  LambdaHTTPServer . Pool < LambdaHTTPServer . LocalServerResponse > . PoolError. self)  { 
330+             try await  withThrowingTaskGroup ( of:  Void . self)  {  group in 
331+                 // Start a requestId-specific consumer
332+                 group. addTask  { 
333+                     _ =  try await  pool. next ( for:  " req1 " ) 
334+                 } 
335+ 
336+                 // Start a FIFO consumer after a delay
337+                 group. addTask  { 
338+                     // Give specific task time to start waiting
339+                     try await  Task . sleep ( nanoseconds:  10_000_000 )   // 0.01 seconds
340+ 
341+                     // Try to use FIFO next - should fail with mixedWaitingModes
342+                     for  try await  _  in  pool { 
343+                         break 
344+                     } 
345+                 } 
346+ 
347+                 // Wait for the first task to complete (which should be the error)
348+                 try await  group. next ( ) 
349+                 group. cancelAll ( ) 
350+             } 
351+         } 
352+ 
353+         // Verify it's the correct error cause
354+         if  case . mixedWaitingModes =  error? . cause { 
355+             // This is the expected error
356+         }  else  { 
357+             Issue . record ( " Expected mixedWaitingModes error, got:  \( String ( describing:  error? . cause) ) " ) 
358+         } 
359+     } 
360+ 
361+     @Test  
362+     @available ( LambdaSwift 2 . 0 ,  * )  
363+     func  testDuplicateRequestIdWaitError( )  async  throws  { 
364+         let  pool  =  LambdaHTTPServer . Pool < LambdaHTTPServer . LocalServerResponse > ( ) 
365+ 
366+         let  error  =  await  #expect( throws:  LambdaHTTPServer . Pool < LambdaHTTPServer . LocalServerResponse > . PoolError. self)  { 
367+             try await  withThrowingTaskGroup ( of:  Void . self)  {  group in 
368+                 // Start first consumer waiting for specific requestId
369+                 group. addTask  { 
370+                     _ =  try await  pool. next ( for:  " req1 " ) 
371+                 } 
372+ 
373+                 // Start second consumer for same requestId after a delay
374+                 group. addTask  { 
375+                     // Give first task time to start waiting
376+                     try await  Task . sleep ( nanoseconds:  10_000_000 )   // 0.01 seconds
377+ 
378+                     // Try to wait for the same requestId - should fail
379+                     _ =  try await  pool. next ( for:  " req1 " ) 
380+                 } 
381+ 
382+                 // Wait for the first task to complete (which should be the error)
383+                 try await  group. next ( ) 
384+                 group. cancelAll ( ) 
385+             } 
386+         } 
387+ 
388+         // Verify it's the correct error cause and requestId
389+         if  case let  . duplicateRequestIdWait( requestId)  =  error? . cause { 
390+             #expect( requestId ==  " req1 " ) 
391+         }  else  { 
392+             Issue . record ( " Expected duplicateRequestIdWait error, got:  \( String ( describing:  error? . cause) ) " ) 
393+         } 
394+     } 
395+ 
396+     @Test  
397+     @available ( LambdaSwift 2 . 0 ,  * )  
398+     func  testConcurrentRequestIdConsumers( )  async  throws  { 
399+         let  pool  =  LambdaHTTPServer . Pool < LambdaHTTPServer . LocalServerResponse > ( ) 
400+ 
401+         let  results  =  try await  withThrowingTaskGroup ( of:  ( String,  String) . self)  {  group in 
402+             // Start multiple consumers for different requestIds
403+             group. addTask  { 
404+                 let  response  =  try await  pool. next ( for:  " req1 " ) 
405+                 return  ( " req1 " ,  String ( buffer:  response. body!) ) 
406+             } 
407+ 
408+             group. addTask  { 
409+                 let  response  =  try await  pool. next ( for:  " req2 " ) 
410+                 return  ( " req2 " ,  String ( buffer:  response. body!) ) 
411+             } 
412+ 
413+             group. addTask  { 
414+                 let  response  =  try await  pool. next ( for:  " req3 " ) 
415+                 return  ( " req3 " ,  String ( buffer:  response. body!) ) 
416+             } 
417+ 
418+             // Start producer task
419+             group. addTask  { 
420+                 // Give tasks time to start waiting
421+                 try await  Task . sleep ( nanoseconds:  10_000_000 )   // 0.01 seconds
422+ 
423+                 // Push responses in different order
424+                 pool. push ( LambdaHTTPServer . LocalServerResponse ( id:  " req3 " ,  body:  ByteBuffer ( string:  " data3 " ) ) ) 
425+                 pool. push ( LambdaHTTPServer . LocalServerResponse ( id:  " req1 " ,  body:  ByteBuffer ( string:  " data1 " ) ) ) 
426+                 pool. push ( LambdaHTTPServer . LocalServerResponse ( id:  " req2 " ,  body:  ByteBuffer ( string:  " data2 " ) ) ) 
427+ 
428+                 return  ( " producer " ,  " " )   // Producer doesn't return meaningful data
429+             } 
430+ 
431+             // Collect results from consumers
432+             var  consumerResults :  [ String :  String ]  =  [ : ] 
433+             for  try await  (requestId,  data)  in  group { 
434+                 if  requestId !=  " producer "  { 
435+                     consumerResults [ requestId]  =  data
436+                 } 
437+                 if  consumerResults. count ==  3  { 
438+                     group. cancelAll ( ) 
439+                     break 
440+                 } 
441+             } 
442+             return  consumerResults
443+         } 
444+ 
445+         // Verify each consumer gets the correct response
446+         #expect( results [ " req1 " ]  ==  " data1 " ) 
447+         #expect( results [ " req2 " ]  ==  " data2 " ) 
448+         #expect( results [ " req3 " ]  ==  " data3 " ) 
449+     } 
450+ 
451+     @Test  
452+     @available ( LambdaSwift 2 . 0 ,  * )  
453+     func  testCancellationCleansUpAllContinuations( )  async  throws  { 
454+         let  pool  =  LambdaHTTPServer . Pool < LambdaHTTPServer . LocalServerResponse > ( ) 
455+ 
456+         // Test that cancellation properly cleans up all continuations
457+         do  { 
458+             try await  withThrowingTaskGroup ( of:  Void . self)  {  group in 
459+                 // Start multiple consumers for different requestIds
460+                 group. addTask  { 
461+                     _ =  try await  pool. next ( for:  " req1 " ) 
462+                 } 
463+ 
464+                 group. addTask  { 
465+                     _ =  try await  pool. next ( for:  " req2 " ) 
466+                 } 
467+ 
468+                 group. addTask  { 
469+                     _ =  try await  pool. next ( for:  " req3 " ) 
470+                 } 
471+ 
472+                 // Give tasks time to start waiting then cancel all
473+                 try await  Task . sleep ( nanoseconds:  10_000_000 )   // 0.01 seconds
474+                 group. cancelAll ( ) 
475+ 
476+                 try await  group. waitForAll ( ) 
477+             } 
478+         }  catch  is CancellationError  { 
479+             // Expected - tasks should be cancelled
480+         } 
481+ 
482+         // Pool should be back to clean state - verify by pushing and consuming normally
483+         pool. push ( LambdaHTTPServer . LocalServerResponse ( id:  " new-req " ,  body:  ByteBuffer ( string:  " new-data " ) ) ) 
484+         let  response  =  try await  pool. next ( for:  " new-req " ) 
485+         #expect( String ( buffer:  response. body!)  ==  " new-data " ) 
486+     } 
487+ 
488+     @Test  
489+     @available ( LambdaSwift 2 . 0 ,  * )  
490+     func  testBufferOrderingWithRequestIds( )  async  throws  { 
491+         let  pool  =  LambdaHTTPServer . Pool < LambdaHTTPServer . LocalServerResponse > ( ) 
492+ 
493+         // Push multiple responses for the same requestId
494+         pool. push ( LambdaHTTPServer . LocalServerResponse ( id:  " req1 " ,  body:  ByteBuffer ( string:  " first " ) ) ) 
495+         pool. push ( LambdaHTTPServer . LocalServerResponse ( id:  " req2 " ,  body:  ByteBuffer ( string:  " other " ) ) ) 
496+         pool. push ( LambdaHTTPServer . LocalServerResponse ( id:  " req1 " ,  body:  ByteBuffer ( string:  " second " ) ) ) 
497+         pool. push ( LambdaHTTPServer . LocalServerResponse ( id:  " req1 " ,  body:  ByteBuffer ( string:  " third " ) ) ) 
498+ 
499+         // Consume in order - should get FIFO order for the same requestId
500+         let  first  =  try await  pool. next ( for:  " req1 " ) 
501+         #expect( String ( buffer:  first. body!)  ==  " first " ) 
502+ 
503+         let  second  =  try await  pool. next ( for:  " req1 " ) 
504+         #expect( String ( buffer:  second. body!)  ==  " second " ) 
505+ 
506+         let  other  =  try await  pool. next ( for:  " req2 " ) 
507+         #expect( String ( buffer:  other. body!)  ==  " other " ) 
508+ 
509+         let  third  =  try await  pool. next ( for:  " req1 " ) 
510+         #expect( String ( buffer:  third. body!)  ==  " third " ) 
187511    } 
188512
189513} 
0 commit comments