Skip to content

Commit b1553d2

Browse files
sebstoSebastien StormacqCopilot
authored
Accept multiple POST /invoke requests to allow parallel testing (#585)
Closing #584 The LocalServer now queues concurrent `POST /invoke` requests from testing client applications and ensures that the requests are delivered to the Lambda Runtime one by one, just like the AWS Lambda Runtime environment does. The `Pool` has now two modes : pure FIFO (one element get exactly one `next()`) and one mode where multiple elements can get pushed and multiple `next(for requestId:String)` can be called concurrently. The two modes are needed because invocations are 1:1 (one `POST /invoke` is always by one matching `GET /next`) but responses are n:n (a response can have multiple chunks and concurrent invocations can trigger multiple `next(for requestId: String)` I made a couple of additional changes while working on this PR - I moved the `Pool` code in a separate file for improved readability - I removed an instance of `DispatchTime` that was hiding in the code, unnoticed until today - I removed the `async` requirement on `Pool.push(_)` function. This was not required (thank you @t089 for having reported this) - I removed the `fatalError()` that was in the `Pool` implementation. The pool now throws an error when `next()` is invoked concurrently, making it easier to test. - I added extensive unit tests to validate the Pool behavior - I added a test to verify that a rapid succession of client invocations are correctly queued and return no error - I moved a `continuation(resume:)` outside of a lock. Generally speaking, it's a bad idea to resume continuation while owning a lock. I suspect this is causing a error during test execution when we spawn and tear down mutliple `Task` very quickly. In some rare occasions, the test was failing with an invalid assertion in NIO : `NIOCore/NIOAsyncWriter.swift:177: Fatal error: Deinited NIOAsyncWriter without calling finish()` --------- Co-authored-by: Sebastien Stormacq <stormacq@amazon.lu> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent e58d891 commit b1553d2

File tree

5 files changed

+735
-120
lines changed

5 files changed

+735
-120
lines changed

Examples/Streaming/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ You can test the function locally before deploying:
8282
swift run
8383

8484
# In another terminal, test with curl:
85-
curl -v \
85+
curl -v --output response.txt \
8686
--header "Content-Type: application/json" \
8787
--data '"this is not used"' \
8888
http://127.0.0.1:7000/invoke
Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftAWSLambdaRuntime open source project
4+
//
5+
// Copyright SwiftAWSLambdaRuntime project authors
6+
// Copyright (c) Amazon.com, Inc. or its affiliates.
7+
// Licensed under Apache License v2.0
8+
//
9+
// See LICENSE.txt for license information
10+
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
11+
//
12+
// SPDX-License-Identifier: Apache-2.0
13+
//
14+
//===----------------------------------------------------------------------===//
15+
16+
#if LocalServerSupport
17+
import DequeModule
18+
import Synchronization
19+
20+
@available(LambdaSwift 2.0, *)
21+
extension LambdaHTTPServer {
22+
23+
/// A shared data structure to store the current invocation or response requests and the continuation objects.
24+
/// This data structure is shared between instances of the HTTPHandler
25+
/// (one instance to serve requests from the Lambda function and one instance to serve requests from the client invoking the lambda function).
26+
internal final class Pool<T>: AsyncSequence, AsyncIteratorProtocol, Sendable where T: Sendable {
27+
private let poolName: String
28+
internal init(name: String = "Pool") { self.poolName = name }
29+
30+
typealias Element = T
31+
32+
struct State {
33+
var buffer: Deque<T> = []
34+
var waitingForAny: CheckedContinuation<T, any Error>?
35+
var waitingForSpecific: [String: CheckedContinuation<T, any Error>] = [:]
36+
}
37+
38+
private let lock = Mutex<State>(State())
39+
40+
/// enqueue an element, or give it back immediately to the iterator if it is waiting for an element
41+
public func push(_ item: T) {
42+
let continuationToResume = self.lock.withLock { state -> CheckedContinuation<T, any Error>? in
43+
// First check if there's a waiting continuation that can handle this item
44+
45+
// Check for FIFO waiter first
46+
if let continuation = state.waitingForAny {
47+
state.waitingForAny = nil
48+
return continuation
49+
}
50+
51+
// Check for specific waiter
52+
if let response = item as? LocalServerResponse,
53+
let requestId = response.requestId,
54+
let continuation = state.waitingForSpecific.removeValue(forKey: requestId)
55+
{
56+
return continuation
57+
}
58+
59+
// No waiting continuation, add to buffer
60+
state.buffer.append(item)
61+
return nil
62+
}
63+
64+
// Resume continuation outside the lock to prevent potential deadlocks
65+
continuationToResume?.resume(returning: item)
66+
}
67+
68+
/// Unified next() method that handles both FIFO and requestId-specific waiting
69+
private func _next(for requestId: String?) async throws -> T {
70+
// exit if the task is cancelled
71+
guard !Task.isCancelled else {
72+
throw CancellationError()
73+
}
74+
75+
return try await withTaskCancellationHandler {
76+
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<T, any Error>) in
77+
let nextAction: Result<T, PoolError>? = self.lock.withLock { state -> Result<T, PoolError>? in
78+
if let requestId = requestId {
79+
// Look for oldest (first) item for this requestId in buffer
80+
if let index = state.buffer.firstIndex(where: { item in
81+
if let response = item as? LocalServerResponse {
82+
return response.requestId == requestId
83+
}
84+
return false
85+
}) {
86+
let item = state.buffer.remove(at: index)
87+
return .success(item)
88+
} else {
89+
// Check for conflicting waiters
90+
if state.waitingForAny != nil {
91+
return .failure(PoolError(cause: .mixedWaitingModes))
92+
}
93+
if state.waitingForSpecific[requestId] != nil {
94+
return .failure(PoolError(cause: .duplicateRequestIdWait(requestId)))
95+
}
96+
97+
// No matching item, wait for it
98+
state.waitingForSpecific[requestId] = continuation
99+
return nil
100+
}
101+
} else {
102+
// FIFO mode - take first item
103+
if let first = state.buffer.popFirst() {
104+
return .success(first)
105+
} else {
106+
// Check for conflicting waiters
107+
if !state.waitingForSpecific.isEmpty {
108+
return .failure(PoolError(cause: .mixedWaitingModes))
109+
}
110+
if state.waitingForAny != nil {
111+
return .failure(PoolError(cause: .nextCalledTwice(state.waitingForAny!)))
112+
}
113+
114+
state.waitingForAny = continuation
115+
return nil
116+
}
117+
}
118+
}
119+
120+
switch nextAction {
121+
case .success(let item):
122+
continuation.resume(returning: item)
123+
case .failure(let error):
124+
if case let .nextCalledTwice(prevContinuation) = error.cause {
125+
prevContinuation.resume(throwing: error)
126+
}
127+
continuation.resume(throwing: error)
128+
case .none:
129+
// do nothing - continuation is stored in state
130+
break
131+
}
132+
}
133+
} onCancel: {
134+
// Ensure we properly handle cancellation by removing stored continuation
135+
let continuationsToCancel = self.lock.withLock { state -> [CheckedContinuation<T, any Error>] in
136+
var toCancel: [CheckedContinuation<T, any Error>] = []
137+
138+
if let continuation = state.waitingForAny {
139+
toCancel.append(continuation)
140+
state.waitingForAny = nil
141+
}
142+
143+
for continuation in state.waitingForSpecific.values {
144+
toCancel.append(continuation)
145+
}
146+
state.waitingForSpecific.removeAll()
147+
148+
return toCancel
149+
}
150+
151+
// Resume all continuations outside the lock to avoid potential deadlocks
152+
for continuation in continuationsToCancel {
153+
continuation.resume(throwing: CancellationError())
154+
}
155+
}
156+
}
157+
158+
/// Simple FIFO next() method - used by AsyncIteratorProtocol
159+
func next() async throws -> T? {
160+
try await _next(for: nil)
161+
}
162+
163+
/// RequestId-specific next() method for LocalServerResponse - NOT part of AsyncIteratorProtocol
164+
func next(for requestId: String) async throws -> T {
165+
try await _next(for: requestId)
166+
}
167+
168+
func makeAsyncIterator() -> Pool {
169+
self
170+
}
171+
172+
struct PoolError: Error {
173+
let cause: Cause
174+
var message: String {
175+
switch self.cause {
176+
case .nextCalledTwice:
177+
return "Concurrent invocations to next(). This is not allowed."
178+
case .duplicateRequestIdWait(let requestId):
179+
return "Already waiting for requestId: \(requestId)"
180+
case .mixedWaitingModes:
181+
return "Cannot mix FIFO waiting (next()) with specific waiting (next(for:))"
182+
}
183+
}
184+
185+
enum Cause {
186+
case nextCalledTwice(CheckedContinuation<T, any Error>)
187+
case duplicateRequestIdWait(String)
188+
case mixedWaitingModes
189+
}
190+
}
191+
}
192+
}
193+
#endif

0 commit comments

Comments
 (0)