Skip to content

Commit f611f1a

Browse files
author
Sebastien Stormacq
committed
WIP LocalServer : allow concurrent invocations of POST /invoke
1 parent 6d02848 commit f611f1a

File tree

1 file changed

+37
-55
lines changed

1 file changed

+37
-55
lines changed

Sources/AWSLambdaRuntime/Lambda+LocalServer.swift

Lines changed: 37 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -390,19 +390,7 @@ internal struct LambdaHTTPServer {
390390
logger[metadataKey: "requestId"] = "\(requestId)"
391391

392392
logger.trace("/invoke received invocation, pushing it to the pool and wait for a lambda response")
393-
// detect concurrent invocations of POST and gently decline the requests while we're processing one.
394-
if await !self.invocationPool.push(LocalServerInvocation(requestId: requestId, request: body)) {
395-
let response = LocalServerResponse(
396-
id: requestId,
397-
status: .badRequest,
398-
body: ByteBuffer(
399-
string:
400-
"It is not allowed to invoke multiple Lambda function executions in parallel. (The Lambda runtime environment on AWS will never do that)"
401-
)
402-
)
403-
try await self.sendResponse(response, outbound: outbound, logger: logger)
404-
return
405-
}
393+
await self.invocationPool.push(LocalServerInvocation(requestId: requestId, request: body))
406394

407395
// wait for the lambda function to process the request
408396
for try await response in self.responsePool {
@@ -571,39 +559,26 @@ internal struct LambdaHTTPServer {
571559

572560
typealias Element = T
573561

574-
enum State: ~Copyable {
575-
case buffer(Deque<T>)
576-
case continuation(CheckedContinuation<T, any Error>?)
562+
struct State: ~Copyable {
563+
var actions : Deque<T>
564+
var continuationQueue: Deque<CheckedContinuation<T, any Error>>
577565
}
578-
579-
private let lock = Mutex<State>(.buffer([]))
566+
private let lock = Mutex<State>(State(actions: [], continuationQueue: []))
580567

581568
/// enqueue an element, or give it back immediately to the iterator if it is waiting for an element
582-
/// Returns true when we receive a element and the pool was in "waiting for continuation" state, false otherwise
583-
@discardableResult
584-
public func push(_ invocation: T) async -> Bool {
569+
public func push(_ action: T) async {
585570

586-
// if the iterator is waiting for an element on `next()``, give it to it
571+
// if the iterator is waiting for an element on `next()`, give it to it (resume the continuation)
587572
// otherwise, enqueue the element
588573
let maybeContinuation = self.lock.withLock { state -> CheckedContinuation<T, any Error>? in
589-
switch consume state {
590-
case .continuation(let continuation):
591-
state = .buffer([])
592-
return continuation
593-
594-
case .buffer(var buffer):
595-
buffer.append(invocation)
596-
state = .buffer(buffer)
574+
if let nextContinuation = state.continuationQueue.popFirst() {
575+
return nextContinuation
576+
} else {
577+
state.actions.append(action)
597578
return nil
598579
}
599580
}
600-
601-
if let maybeContinuation {
602-
maybeContinuation.resume(returning: invocation)
603-
return true
604-
} else {
605-
return false
606-
}
581+
maybeContinuation?.resume(returning: action)
607582
}
608583

609584
func next() async throws -> T? {
@@ -614,34 +589,41 @@ internal struct LambdaHTTPServer {
614589

615590
return try await withTaskCancellationHandler {
616591
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<T, any Error>) in
617-
let nextAction = self.lock.withLock { state -> T? in
618-
switch consume state {
619-
case .buffer(var buffer):
620-
if let first = buffer.popFirst() {
621-
state = .buffer(buffer)
622-
return first
592+
let (nextAction, nextContinuation) = self.lock.withLock { state -> (T?, CheckedContinuation<T, any Error>) in
593+
if let nextContinuation = state.continuationQueue.popFirst() {
594+
if let nextAction = state.actions.popFirst() {
595+
return (nextAction, nextContinuation)
623596
} else {
624-
state = .continuation(continuation)
625-
return nil
597+
// next is called when there is no action to process, but there is a continuation waiting
598+
// we need to enqueue the continuation to be resumed when the next action is available
599+
state.continuationQueue.append(nextContinuation)
600+
state.continuationQueue.append(continuation)
601+
return (nil, continuation)
602+
}
603+
} else {
604+
// we have no waiting continuation, let's consume the one we just received
605+
if let nextAction = state.actions.popFirst() {
606+
return (nextAction, continuation)
607+
} else {
608+
// there is no continuation and no action waiting,
609+
// enqueue the continuation for later usage
610+
state.continuationQueue.append(continuation)
611+
return (nil, continuation)
626612
}
627-
628-
case .continuation(_):
629-
fatalError("\(self.poolName) : Concurrent invocations to next(). This is not allowed.")
630613
}
631614
}
632615

616+
// there is no next action, ignore
633617
guard let nextAction else { return }
634618

635-
continuation.resume(returning: nextAction)
619+
// we have a next action and a next continuation, resume it
620+
nextContinuation.resume(returning: nextAction)
636621
}
637622
} onCancel: {
638623
self.lock.withLock { state in
639-
switch consume state {
640-
case .buffer(let buffer):
641-
state = .buffer(buffer)
642-
case .continuation(let continuation):
643-
continuation?.resume(throwing: CancellationError())
644-
state = .buffer([])
624+
// when a cancellation is received, resume all pending continuations
625+
for continuation in state.continuationQueue {
626+
continuation.resume(throwing: CancellationError())
645627
}
646628
}
647629
}

0 commit comments

Comments
 (0)