Skip to content

Commit 136338e

Browse files
author
Sebastien Stormacq
committed
Merge branch 'sebsto/fix_584' into sebsto/multiple_continuations
2 parents d330b02 + 0566ef5 commit 136338e

File tree

2 files changed

+19
-17
lines changed

2 files changed

+19
-17
lines changed

Sources/AWSLambdaRuntime/Lambda+LocalServer.swift

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ internal struct LambdaHTTPServer {
272272

273273
// for streaming requests, push a partial head response
274274
if self.isStreamingResponse(requestHead) {
275-
await self.responsePool.push(
275+
self.responsePool.push(
276276
LocalServerResponse(
277277
id: requestId,
278278
status: .ok
@@ -286,7 +286,7 @@ internal struct LambdaHTTPServer {
286286
// if this is a request from a Streaming Lambda Handler,
287287
// stream the response instead of buffering it
288288
if self.isStreamingResponse(requestHead) {
289-
await self.responsePool.push(
289+
self.responsePool.push(
290290
LocalServerResponse(id: requestId, body: body)
291291
)
292292
} else {
@@ -298,7 +298,7 @@ internal struct LambdaHTTPServer {
298298

299299
if self.isStreamingResponse(requestHead) {
300300
// for streaming response, send the final response
301-
await self.responsePool.push(
301+
self.responsePool.push(
302302
LocalServerResponse(id: requestId, final: true)
303303
)
304304
} else {
@@ -390,7 +390,7 @@ internal struct LambdaHTTPServer {
390390
logger[metadataKey: "requestId"] = "\(requestId)"
391391

392392
logger.trace("/invoke received invocation, pushing it to the pool and wait for a lambda response")
393-
await self.invocationPool.push(LocalServerInvocation(requestId: requestId, request: body))
393+
self.invocationPool.push(LocalServerInvocation(requestId: requestId, request: body))
394394

395395
// wait for the lambda function to process the request
396396
for try await response in self.responsePool {
@@ -463,7 +463,7 @@ internal struct LambdaHTTPServer {
463463
}
464464
// enqueue the lambda function response to be served as response to the client /invoke
465465
logger.trace("/:requestId/response received response", metadata: ["requestId": "\(requestId)"])
466-
await self.responsePool.push(
466+
self.responsePool.push(
467467
LocalServerResponse(
468468
id: requestId,
469469
status: .accepted,
@@ -494,7 +494,7 @@ internal struct LambdaHTTPServer {
494494
}
495495
// enqueue the lambda function response to be served as response to the client /invoke
496496
logger.trace("/:requestId/response received response", metadata: ["requestId": "\(requestId)"])
497-
await self.responsePool.push(
497+
self.responsePool.push(
498498
LocalServerResponse(
499499
id: requestId,
500500
status: .internalServerError,
@@ -560,21 +560,21 @@ internal struct LambdaHTTPServer {
560560
typealias Element = T
561561

562562
struct State: ~Copyable {
563-
var actions: Deque<T>
563+
var actionQueue: Deque<T>
564564
var continuationQueue: Deque<CheckedContinuation<T, any Error>>
565565
}
566-
private let lock = Mutex<State>(State(actions: [], continuationQueue: []))
566+
private let lock = Mutex<State>(State(actionQueue: [], continuationQueue: []))
567567

568568
/// enqueue an element, or give it back immediately to the iterator if it is waiting for an element
569-
public func push(_ action: T) async {
569+
public func push(_ action: T) {
570570

571571
// if the iterator is waiting for an element on `next()`, give it to it (resume the continuation)
572572
// otherwise, enqueue the element
573573
let maybeContinuation = self.lock.withLock { state -> CheckedContinuation<T, any Error>? in
574574
if let nextContinuation = state.continuationQueue.popFirst() {
575575
return nextContinuation
576576
} else {
577-
state.actions.append(action)
577+
state.actionQueue.append(action)
578578
return nil
579579
}
580580
}
@@ -592,7 +592,7 @@ internal struct LambdaHTTPServer {
592592
let (nextAction, nextContinuation) = self.lock.withLock {
593593
state -> (T?, CheckedContinuation<T, any Error>) in
594594
if let nextContinuation = state.continuationQueue.popFirst() {
595-
if let nextAction = state.actions.popFirst() {
595+
if let nextAction = state.actionQueue.popFirst() {
596596
return (nextAction, nextContinuation)
597597
} else {
598598
// next is called when there is no action to process, but there is a continuation waiting

Tests/AWSLambdaRuntimeTests/PoolTests.swift

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ struct PoolTests {
2424
let pool = LambdaHTTPServer.Pool<String>()
2525

2626
// Push values
27-
await pool.push("first")
28-
await pool.push("second")
27+
pool.push("first")
28+
pool.push("second")
2929

3030
// Iterate and verify order
3131
var values = [String]()
@@ -53,7 +53,9 @@ struct PoolTests {
5353
task.cancel()
5454

5555
// This should complete without receiving any values
56-
try await task.value
56+
do {
57+
try await task.value
58+
} catch is CancellationError {} // this might happen depending on the order on which the cancellation is handled
5759
}
5860

5961
@Test
@@ -78,7 +80,7 @@ struct PoolTests {
7880
try await withThrowingTaskGroup(of: Void.self) { group in
7981
for i in 0..<iterations {
8082
group.addTask {
81-
await pool.push(i)
83+
pool.push(i)
8284
}
8385
}
8486
try await group.waitForAll()
@@ -110,7 +112,7 @@ struct PoolTests {
110112
try await Task.sleep(nanoseconds: 100_000_000) // 0.1 seconds
111113

112114
// Push a value
113-
await pool.push(expectedValue)
115+
pool.push(expectedValue)
114116

115117
// Wait for consumer to complete
116118
try await consumer.value
@@ -140,7 +142,7 @@ struct PoolTests {
140142
for p in 0..<producerCount {
141143
group.addTask {
142144
for i in 0..<messagesPerProducer {
143-
await pool.push(p * messagesPerProducer + i)
145+
pool.push(p * messagesPerProducer + i)
144146
}
145147
}
146148
}

0 commit comments

Comments
 (0)