Skip to content

Commit 1de4b19

Browse files
author
Sebastien Stormacq
committed
Gently decline subsequent POST /invoke request while the Lambda handler processes a request
1 parent 461c18a commit 1de4b19

File tree

2 files changed

+39
-10
lines changed

2 files changed

+39
-10
lines changed

Examples/Streaming/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ You can test the function locally before deploying:
8282
swift run
8383

8484
# In another terminal, test with curl:
85-
curl -v \
85+
curl -v --output response.txt \
8686
--header "Content-Type: application/json" \
8787
--data '"this is not used"' \
8888
http://127.0.0.1:7000/invoke

Sources/AWSLambdaRuntime/Lambda+LocalServer.swift

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,8 @@ extension Lambda {
9595
internal struct LambdaHTTPServer {
9696
private let invocationEndpoint: String
9797

98-
private let invocationPool = Pool<LocalServerInvocation>()
99-
private let responsePool = Pool<LocalServerResponse>()
98+
private let invocationPool = Pool<LocalServerInvocation>(name: "Invocation Pool")
99+
private let responsePool = Pool<LocalServerResponse>(name: "Response Pool")
100100

101101
private init(
102102
invocationEndpoint: String?
@@ -388,8 +388,21 @@ internal struct LambdaHTTPServer {
388388
// we always accept the /invoke request and push them to the pool
389389
let requestId = "\(DispatchTime.now().uptimeNanoseconds)"
390390
logger[metadataKey: "requestId"] = "\(requestId)"
391+
391392
logger.trace("/invoke received invocation, pushing it to the pool and wait for a lambda response")
392-
await self.invocationPool.push(LocalServerInvocation(requestId: requestId, request: body))
393+
// detect concurrent invocations of POST and gently decline the requests while we're processing one.
394+
if await !self.invocationPool.push(LocalServerInvocation(requestId: requestId, request: body)) {
395+
let response = LocalServerResponse(
396+
id: requestId,
397+
status: .badRequest,
398+
body: ByteBuffer(
399+
string:
400+
"It's illegal to invoke multiple Lambda function executions in parallel. (The Lambda runtime environment on AWS will never do that)"
401+
)
402+
)
403+
try await self.sendResponse(response, outbound: outbound, logger: logger)
404+
return
405+
}
393406

394407
// wait for the lambda function to process the request
395408
for try await response in self.responsePool {
@@ -410,7 +423,12 @@ internal struct LambdaHTTPServer {
410423
"Received response for a different request id",
411424
metadata: ["response requestId": "\(response.requestId ?? "")"]
412425
)
413-
// should we return an error here ? Or crash as this is probably a programming error?
426+
let response = LocalServerResponse(
427+
id: requestId,
428+
status: .badRequest,
429+
body: ByteBuffer(string: "The response Id not equal to the request Id.")
430+
)
431+
try await self.sendResponse(response, outbound: outbound, logger: logger)
414432
}
415433
}
416434
// What todo when there is no more responses to process?
@@ -548,6 +566,9 @@ internal struct LambdaHTTPServer {
548566
/// This data structure is shared between instances of the HTTPHandler
549567
/// (one instance to serve requests from the Lambda function and one instance to serve requests from the client invoking the lambda function).
550568
internal final class Pool<T>: AsyncSequence, AsyncIteratorProtocol, Sendable where T: Sendable {
569+
private let poolName: String
570+
internal init(name: String) { self.poolName = name }
571+
551572
typealias Element = T
552573

553574
enum State: ~Copyable {
@@ -558,8 +579,11 @@ internal struct LambdaHTTPServer {
558579
private let lock = Mutex<State>(.buffer([]))
559580

560581
/// enqueue an element, or give it back immediately to the iterator if it is waiting for an element
561-
public func push(_ invocation: T) async {
562-
// if the iterator is waiting for an element, give it to it
582+
/// Returns true when we receive a element and the pool was in "waiting for continuation" state, false otherwise
583+
@discardableResult
584+
public func push(_ invocation: T) async -> Bool {
585+
586+
// if the iterator is waiting for an element on `next()``, give it to it
563587
// otherwise, enqueue the element
564588
let maybeContinuation = self.lock.withLock { state -> CheckedContinuation<T, any Error>? in
565589
switch consume state {
@@ -574,7 +598,12 @@ internal struct LambdaHTTPServer {
574598
}
575599
}
576600

577-
maybeContinuation?.resume(returning: invocation)
601+
if let maybeContinuation {
602+
maybeContinuation.resume(returning: invocation)
603+
return true
604+
} else {
605+
return false
606+
}
578607
}
579608

580609
func next() async throws -> T? {
@@ -596,8 +625,8 @@ internal struct LambdaHTTPServer {
596625
return nil
597626
}
598627

599-
case .continuation:
600-
fatalError("Concurrent invocations to next(). This is illegal.")
628+
case .continuation(_):
629+
fatalError("\(self.poolName) : Concurrent invocations to next(). This is illegal.")
601630
}
602631
}
603632

0 commit comments

Comments
 (0)