Skip to content

Commit a2ce3be

Browse files
author
Sebastien Stormacq
committed
move pool to a separate file
1 parent 0ed15f9 commit a2ce3be

File tree

2 files changed

+213
-204
lines changed

2 files changed

+213
-204
lines changed
Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
#if LocalServerSupport
2+
import DequeModule
3+
import Synchronization
4+
5+
@available(LambdaSwift 2.0, *)
6+
extension LambdaHTTPServer {
7+
8+
/// A shared data structure to store the current invocation or response requests and the continuation objects.
9+
/// This data structure is shared between instances of the HTTPHandler
10+
/// (one instance to serve requests from the Lambda function and one instance to serve requests from the client invoking the lambda function).
11+
internal final class Pool<T>: AsyncSequence, AsyncIteratorProtocol, Sendable where T: Sendable {
12+
private let poolName: String
13+
internal init(name: String = "Pool") { self.poolName = name }
14+
15+
typealias Element = T
16+
17+
enum State: ~Copyable {
18+
case buffer(Deque<T>)
19+
// FIFO waiting (for invocations)
20+
case waitingForAny(CheckedContinuation<T, any Error>)
21+
// RequestId-based waiting (for responses)
22+
case waitingForSpecific([String: CheckedContinuation<T, any Error>])
23+
}
24+
25+
private let lock = Mutex<State>(.buffer([]))
26+
27+
/// enqueue an element, or give it back immediately to the iterator if it is waiting for an element
28+
public func push(_ item: T) {
29+
let continuationToResume = self.lock.withLock { state -> CheckedContinuation<T, any Error>? in
30+
switch consume state {
31+
case .buffer(var buffer):
32+
buffer.append(item)
33+
state = .buffer(buffer)
34+
return nil
35+
36+
case .waitingForAny(let continuation):
37+
// Someone is waiting for any item (FIFO)
38+
state = .buffer([])
39+
return continuation
40+
41+
case .waitingForSpecific(var continuations):
42+
// Check if this item matches any waiting continuation
43+
if let response = item as? LocalServerResponse,
44+
let requestId = response.requestId,
45+
let continuation = continuations.removeValue(forKey: requestId)
46+
{
47+
// Found a matching continuation
48+
if continuations.isEmpty {
49+
state = .buffer([])
50+
} else {
51+
state = .waitingForSpecific(continuations)
52+
}
53+
return continuation
54+
} else {
55+
// No matching continuation, add to buffer
56+
var buffer = Deque<T>()
57+
buffer.append(item)
58+
state = .buffer(buffer)
59+
return nil
60+
}
61+
}
62+
}
63+
64+
// Resume continuation outside the lock to prevent potential deadlocks
65+
continuationToResume?.resume(returning: item)
66+
}
67+
68+
/// Unified next() method that handles both FIFO and requestId-specific waiting
69+
private func _next(for requestId: String?) async throws -> T {
70+
// exit if the task is cancelled
71+
guard !Task.isCancelled else {
72+
throw CancellationError()
73+
}
74+
75+
return try await withTaskCancellationHandler {
76+
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<T, any Error>) in
77+
let nextAction: Result<T, PoolError>? = self.lock.withLock { state -> Result<T, PoolError>? in
78+
switch consume state {
79+
case .buffer(var buffer):
80+
if let requestId = requestId {
81+
// Look for oldest (first) item for this requestId in buffer
82+
if let index = buffer.firstIndex(where: { item in
83+
if let response = item as? LocalServerResponse {
84+
return response.requestId == requestId
85+
}
86+
return false
87+
}) {
88+
let item = buffer.remove(at: index)
89+
state = .buffer(buffer)
90+
return .success(item)
91+
} else {
92+
// No matching item, wait for it
93+
var continuations: [String: CheckedContinuation<T, any Error>] = [:]
94+
continuations[requestId] = continuation
95+
state = .waitingForSpecific(continuations)
96+
return nil
97+
}
98+
} else {
99+
// FIFO mode - take first item
100+
if let first = buffer.popFirst() {
101+
state = .buffer(buffer)
102+
return .success(first)
103+
} else {
104+
state = .waitingForAny(continuation)
105+
return nil
106+
}
107+
}
108+
109+
case .waitingForAny(let previousContinuation):
110+
if requestId == nil {
111+
// Another FIFO call while already waiting
112+
state = .buffer([])
113+
return .failure(PoolError(cause: .nextCalledTwice(previousContinuation)))
114+
} else {
115+
// Can't mix FIFO and specific waiting
116+
state = .waitingForAny(previousContinuation)
117+
return .failure(PoolError(cause: .mixedWaitingModes))
118+
}
119+
120+
case .waitingForSpecific(var continuations):
121+
if let requestId = requestId {
122+
if continuations[requestId] != nil {
123+
// Already waiting for this requestId
124+
state = .waitingForSpecific(continuations)
125+
return .failure(PoolError(cause: .duplicateRequestIdWait(requestId)))
126+
} else {
127+
continuations[requestId] = continuation
128+
state = .waitingForSpecific(continuations)
129+
return nil
130+
}
131+
} else {
132+
// Can't mix FIFO and specific waiting
133+
state = .waitingForSpecific(continuations)
134+
return .failure(PoolError(cause: .mixedWaitingModes))
135+
}
136+
}
137+
}
138+
139+
switch nextAction {
140+
case .success(let item):
141+
continuation.resume(returning: item)
142+
case .failure(let error):
143+
if case let .nextCalledTwice(prevContinuation) = error.cause {
144+
prevContinuation.resume(throwing: error)
145+
}
146+
continuation.resume(throwing: error)
147+
case .none:
148+
// do nothing - continuation is stored in state
149+
break
150+
}
151+
}
152+
} onCancel: {
153+
// Ensure we properly handle cancellation by checking if we have a stored continuation
154+
let continuationsToCancel = self.lock.withLock { state -> [String: CheckedContinuation<T, any Error>] in
155+
switch consume state {
156+
case .buffer(let buffer):
157+
state = .buffer(buffer)
158+
return [:]
159+
case .waitingForAny(let continuation):
160+
state = .buffer([])
161+
return ["": continuation] // Use empty string as key for single continuation
162+
case .waitingForSpecific(let continuations):
163+
state = .buffer([])
164+
return continuations
165+
}
166+
}
167+
168+
// Resume all continuations outside the lock to avoid potential deadlocks
169+
for continuation in continuationsToCancel.values {
170+
continuation.resume(throwing: CancellationError())
171+
}
172+
}
173+
}
174+
175+
/// Simple FIFO next() method - used by AsyncIteratorProtocol
176+
func next() async throws -> T? {
177+
try await _next(for: nil)
178+
}
179+
180+
/// RequestId-specific next() method for LocalServerResponse - NOT part of AsyncIteratorProtocol
181+
func next(for requestId: String) async throws -> T {
182+
try await _next(for: requestId)
183+
}
184+
185+
func makeAsyncIterator() -> Pool {
186+
self
187+
}
188+
189+
struct PoolError: Error {
190+
let cause: Cause
191+
var message: String {
192+
switch self.cause {
193+
case .nextCalledTwice:
194+
return "Concurrent invocations to next(). This is not allowed."
195+
case .duplicateRequestIdWait(let requestId):
196+
return "Already waiting for requestId: \(requestId)"
197+
case .mixedWaitingModes:
198+
return "Cannot mix FIFO waiting (next()) with specific waiting (next(for:))"
199+
}
200+
}
201+
202+
enum Cause {
203+
case nextCalledTwice(CheckedContinuation<T, any Error>)
204+
case duplicateRequestIdWait(String)
205+
case mixedWaitingModes
206+
}
207+
}
208+
}
209+
}
210+
#endif

0 commit comments

Comments
 (0)