Skip to content

Commit a4c04e9

Browse files
author
Sebastien Stormacq
committed
initial commit to support multi concurrency on Lambda Managed Instances
1 parent 72865e4 commit a4c04e9

File tree

6 files changed

+118
-79
lines changed

6 files changed

+118
-79
lines changed

Sources/AWSLambdaRuntime/FoundationSupport/Lambda+JSON.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ extension LambdaRuntime {
137137
decoder: JSONDecoder = JSONDecoder(),
138138
encoder: JSONEncoder = JSONEncoder(),
139139
logger: Logger = Logger(label: "LambdaRuntime"),
140-
body: sending @escaping (Event, LambdaContext) async throws -> Output
140+
body: @Sendable @escaping (Event, LambdaContext) async throws -> Output
141141
)
142142
where
143143
Handler == LambdaCodableAdapter<
@@ -164,7 +164,7 @@ extension LambdaRuntime {
164164
public convenience init<Event: Decodable>(
165165
decoder: JSONDecoder = JSONDecoder(),
166166
logger: Logger = Logger(label: "LambdaRuntime"),
167-
body: sending @escaping (Event, LambdaContext) async throws -> Void
167+
body: @Sendable @escaping (Event, LambdaContext) async throws -> Void
168168
)
169169
where
170170
Handler == LambdaCodableAdapter<

Sources/AWSLambdaRuntime/Lambda+Codable.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import NIOCore
1717

1818
/// The protocol a decoder must conform to so that it can be used with ``LambdaCodableAdapter`` to decode incoming
1919
/// `ByteBuffer` events.
20-
public protocol LambdaEventDecoder {
20+
public protocol LambdaEventDecoder: Sendable {
2121
/// Decode the `ByteBuffer` representing the received event into the generic `Event` type
2222
/// the handler will receive.
2323
/// - Parameters:
@@ -29,7 +29,7 @@ public protocol LambdaEventDecoder {
2929

3030
/// The protocol an encoder must conform to so that it can be used with ``LambdaCodableAdapter`` to encode the generic
3131
/// ``LambdaOutputEncoder/Output`` object into a `ByteBuffer`.
32-
public protocol LambdaOutputEncoder {
32+
public protocol LambdaOutputEncoder: Sendable {
3333
associatedtype Output
3434

3535
/// Encode the generic type `Output` the handler has returned into a `ByteBuffer`.

Sources/AWSLambdaRuntime/LambdaHandlers.swift

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import NIOCore
2323
/// ``LambdaResponseStreamWriter/finish()`` or ``LambdaResponseStreamWriter/writeAndFinish(_:)``,
2424
/// the ``handle(_:responseWriter:context:)`` function is free to execute any background work.
2525
@available(LambdaSwift 2.0, *)
26-
public protocol StreamingLambdaHandler: _Lambda_SendableMetatype {
26+
public protocol StreamingLambdaHandler: Sendable, _Lambda_SendableMetatype {
2727
/// The handler function -- implement the business logic of the Lambda function here.
2828
/// - Parameters:
2929
/// - event: The invocation's input data.
@@ -48,7 +48,7 @@ public protocol StreamingLambdaHandler: _Lambda_SendableMetatype {
4848

4949
/// A writer object to write the Lambda response stream into. The HTTP response is started lazily.
5050
/// before the first call to ``write(_:)`` or ``writeAndFinish(_:)``.
51-
public protocol LambdaResponseStreamWriter {
51+
public protocol LambdaResponseStreamWriter: Sendable {
5252
/// Write a response part into the stream. Bytes written are streamed continually.
5353
/// - Parameter buffer: The buffer to write.
5454
/// - Parameter hasCustomHeaders: If `true`, the response will be sent with custom HTTP status code and headers.
@@ -69,7 +69,7 @@ public protocol LambdaResponseStreamWriter {
6969
/// - note: This handler protocol does not support response streaming because the output has to be encoded prior to it being sent, e.g. it is not possible to encode a partial/incomplete JSON string.
7070
/// This protocol also does not support the execution of background work after the response has been returned -- the ``LambdaWithBackgroundProcessingHandler`` protocol caters for such use-cases.
7171
@available(LambdaSwift 2.0, *)
72-
public protocol LambdaHandler {
72+
public protocol LambdaHandler: Sendable {
7373
/// Generic input type.
7474
/// The body of the request sent to Lambda will be decoded into this type for the handler to consume.
7575
associatedtype Event
@@ -92,7 +92,7 @@ public protocol LambdaHandler {
9292
/// ``LambdaWithBackgroundProcessingHandler/handle(_:outputWriter:context:)`` function is then
9393
/// free to implement any background work after the result has been sent to the AWS Lambda control plane.
9494
@available(LambdaSwift 2.0, *)
95-
public protocol LambdaWithBackgroundProcessingHandler {
95+
public protocol LambdaWithBackgroundProcessingHandler: Sendable {
9696
/// Generic input type.
9797
/// The body of the request sent to Lambda will be decoded into this type for the handler to consume.
9898
associatedtype Event
@@ -116,7 +116,7 @@ public protocol LambdaWithBackgroundProcessingHandler {
116116
/// Used with ``LambdaWithBackgroundProcessingHandler``.
117117
/// A mechanism to "return" an output from ``LambdaWithBackgroundProcessingHandler/handle(_:outputWriter:context:)`` without the function needing to
118118
/// have a return type and exit at that point. This allows for background work to be executed _after_ a response has been sent to the AWS Lambda response endpoint.
119-
public protocol LambdaResponseWriter<Output> {
119+
public protocol LambdaResponseWriter<Output>: Sendable {
120120
associatedtype Output
121121
/// Sends the generic ``LambdaResponseWriter/Output`` object (representing the computed result of the handler)
122122
/// to the AWS Lambda response endpoint.
@@ -157,18 +157,18 @@ public struct StreamingClosureHandler: StreamingLambdaHandler {
157157
/// A ``LambdaHandler`` conforming handler object that can be constructed with a closure.
158158
/// Allows for a handler to be defined in a clean manner, leveraging Swift's trailing closure syntax.
159159
@available(LambdaSwift 2.0, *)
160-
public struct ClosureHandler<Event: Decodable, Output>: LambdaHandler {
161-
let body: (Event, LambdaContext) async throws -> Output
160+
public struct ClosureHandler<Event: Decodable & Sendable, Output>: LambdaHandler {
161+
let body: @Sendable (Event, LambdaContext) async throws -> Output
162162

163163
/// Initialize with a closure handler over generic `Input` and `Output` types.
164164
/// - Parameter body: The handler function written as a closure.
165-
public init(body: sending @escaping (Event, LambdaContext) async throws -> Output) where Output: Encodable {
165+
public init(body: @Sendable @escaping (Event, LambdaContext) async throws -> Output) where Output: Encodable {
166166
self.body = body
167167
}
168168

169169
/// Initialize with a closure handler over a generic `Input` type, and a `Void` `Output`.
170170
/// - Parameter body: The handler function written as a closure.
171-
public init(body: @escaping (Event, LambdaContext) async throws -> Void) where Output == Void {
171+
public init(body: @Sendable @escaping (Event, LambdaContext) async throws -> Void) where Output == Void {
172172
self.body = body
173173
}
174174

@@ -210,7 +210,7 @@ extension LambdaRuntime {
210210
encoder: sending Encoder,
211211
decoder: sending Decoder,
212212
logger: Logger = Logger(label: "LambdaRuntime"),
213-
body: sending @escaping (Event, LambdaContext) async throws -> Output
213+
body: @Sendable @escaping (Event, LambdaContext) async throws -> Output
214214
)
215215
where
216216
Handler == LambdaCodableAdapter<
@@ -240,7 +240,7 @@ extension LambdaRuntime {
240240
public convenience init<Event: Decodable, Decoder: LambdaEventDecoder>(
241241
decoder: sending Decoder,
242242
logger: Logger = Logger(label: "LambdaRuntime"),
243-
body: sending @escaping (Event, LambdaContext) async throws -> Void
243+
body: @Sendable @escaping (Event, LambdaContext) async throws -> Void
244244
)
245245
where
246246
Handler == LambdaCodableAdapter<

Sources/AWSLambdaRuntime/LambdaRuntime.swift

Lines changed: 76 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,7 @@ private let _isRunning = Atomic<Bool>(false)
2525
@available(LambdaSwift 2.0, *)
2626
public final class LambdaRuntime<Handler>: Sendable where Handler: StreamingLambdaHandler {
2727
@usableFromInline
28-
/// we protect the handler behind a Mutex to ensure that we only ever have one copy of it
29-
let handlerStorage: SendingStorage<Handler>
28+
let handler: Handler
3029
@usableFromInline
3130
let logger: Logger
3231
@usableFromInline
@@ -37,7 +36,7 @@ public final class LambdaRuntime<Handler>: Sendable where Handler: StreamingLamb
3736
eventLoop: EventLoop = Lambda.defaultEventLoop,
3837
logger: Logger = Logger(label: "LambdaRuntime")
3938
) {
40-
self.handlerStorage = SendingStorage(handler)
39+
self.handler = handler
4140
self.eventLoop = eventLoop
4241

4342
// by setting the log level here, we understand it can not be changed dynamically at runtime
@@ -74,45 +73,47 @@ public final class LambdaRuntime<Handler>: Sendable where Handler: StreamingLamb
7473
_isRunning.store(false, ordering: .releasing)
7574
}
7675

77-
// The handler can be non-sendable, we want to ensure we only ever have one copy of it
78-
let handler = try? self.handlerStorage.get()
79-
guard let handler else {
80-
throw LambdaRuntimeError(code: .handlerCanOnlyBeGetOnce)
81-
}
82-
8376
// are we running inside an AWS Lambda runtime environment ?
8477
// AWS_LAMBDA_RUNTIME_API is set when running on Lambda
8578
// https://docs.aws.amazon.com/lambda/latest/dg/runtimes-api.html
8679
if let runtimeEndpoint = Lambda.env("AWS_LAMBDA_RUNTIME_API") {
8780

88-
let ipAndPort = runtimeEndpoint.split(separator: ":", maxSplits: 1)
89-
let ip = String(ipAndPort[0])
90-
guard let port = Int(ipAndPort[1]) else { throw LambdaRuntimeError(code: .invalidPort) }
91-
92-
do {
93-
try await LambdaRuntimeClient.withRuntimeClient(
94-
configuration: .init(ip: ip, port: port),
81+
// Get the max concurrency authorized by user when running on
82+
// Lambda Managed Instances
83+
// This is not documented anywhere, except the NodeJS runtime
84+
// https://github.com/aws/aws-lambda-nodejs-runtime-interface-client/blob/a4560c87426fa0a34756296a30d7add1388e575c/src/utils/env.ts#L34
85+
// and
86+
// https://github.com/aws/aws-lambda-nodejs-runtime-interface-client/blob/a4560c87426fa0a34756296a30d7add1388e575c/src/worker/ignition.ts#L12
87+
let maxConcurrency = Int(Lambda.env("AWS_LAMBDA_MAX_CONCURRENCY") ?? "1") ?? 1
88+
89+
// when max concurrency is 1, do not pay the overhead of launching a Task
90+
if maxConcurrency <= 1 {
91+
self.logger.trace("Starting one Runtime Interface Client")
92+
try await self.startRuntimeInterfaceClient(
93+
endpoint: runtimeEndpoint,
94+
handler: handler,
9595
eventLoop: self.eventLoop,
9696
logger: self.logger
97-
) { runtimeClient in
98-
try await Lambda.runLoop(
99-
runtimeClient: runtimeClient,
100-
handler: handler,
101-
logger: self.logger
102-
)
103-
}
104-
} catch {
105-
// catch top level errors that have not been handled until now
106-
// this avoids the runtime to crash and generate a backtrace
107-
if let error = error as? LambdaRuntimeError,
108-
error.code != .connectionToControlPlaneLost
109-
{
110-
// if the error is a LambdaRuntimeError but not a connection error,
111-
// we rethrow it to preserve existing behaviour
112-
self.logger.error("LambdaRuntime.run() failed with error", metadata: ["error": "\(error)"])
113-
throw error
114-
} else {
115-
self.logger.trace("LambdaRuntime.run() connection lost")
97+
)
98+
} else {
99+
try await withThrowingTaskGroup(of: Void.self) { group in
100+
101+
self.logger.trace("Starting \(maxConcurrency) Runtime Interface Clients")
102+
for i in 0..<maxConcurrency {
103+
104+
group.addTask {
105+
var logger = self.logger
106+
logger[metadataKey: "RIC"] = "\(i)"
107+
try await self.startRuntimeInterfaceClient(
108+
endpoint: runtimeEndpoint,
109+
handler: self.handler,
110+
eventLoop: self.eventLoop,
111+
logger: logger
112+
)
113+
}
114+
}
115+
// Wait for all tasks to complete and propagate any errors
116+
try await group.waitForAll()
116117
}
117118
}
118119

@@ -141,7 +142,7 @@ public final class LambdaRuntime<Handler>: Sendable where Handler: StreamingLamb
141142
) { runtimeClient in
142143
try await Lambda.runLoop(
143144
runtimeClient: runtimeClient,
144-
handler: handler,
145+
handler: self.handler,
145146
logger: self.logger
146147
)
147148
}
@@ -152,4 +153,43 @@ public final class LambdaRuntime<Handler>: Sendable where Handler: StreamingLamb
152153
#endif
153154
}
154155
}
156+
157+
internal func startRuntimeInterfaceClient(
158+
endpoint: String,
159+
handler: Handler,
160+
eventLoop: EventLoop,
161+
logger: Logger
162+
) async throws {
163+
164+
let ipAndPort = endpoint.split(separator: ":", maxSplits: 1)
165+
let ip = String(ipAndPort[0])
166+
guard let port = Int(ipAndPort[1]) else { throw LambdaRuntimeError(code: .invalidPort) }
167+
168+
do {
169+
try await LambdaRuntimeClient.withRuntimeClient(
170+
configuration: .init(ip: ip, port: port),
171+
eventLoop: eventLoop,
172+
logger: logger
173+
) { runtimeClient in
174+
try await Lambda.runLoop(
175+
runtimeClient: runtimeClient,
176+
handler: handler,
177+
logger: logger
178+
)
179+
}
180+
} catch {
181+
// catch top level errors that have not been handled until now
182+
// this avoids the runtime to crash and generate a backtrace
183+
if let error = error as? LambdaRuntimeError,
184+
error.code != .connectionToControlPlaneLost
185+
{
186+
// if the error is a LambdaRuntimeError but not a connection error,
187+
// we rethrow it to preserve existing behaviour
188+
logger.error("LambdaRuntime.run() failed with error", metadata: ["error": "\(error)"])
189+
throw error
190+
} else {
191+
logger.trace("LambdaRuntime.run() connection lost")
192+
}
193+
}
194+
}
155195
}

Sources/AWSLambdaRuntime/LambdaRuntimeError.swift

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ package struct LambdaRuntimeError: Error {
3434

3535
case missingLambdaRuntimeAPIEnvironmentVariable
3636
case runtimeCanOnlyBeStartedOnce
37-
case handlerCanOnlyBeGetOnce
3837
case invalidPort
3938
}
4039

Sources/AWSLambdaRuntime/Utils.swift

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -107,33 +107,33 @@ extension AmazonHeaders {
107107
}
108108
}
109109

110-
/// Temporary storage for value being sent from one isolation domain to another
111-
// use NIOLockedValueBox instead of Mutex to avoid compiler crashes on 6.0
112-
// see https://github.com/swiftlang/swift/issues/78048
113-
@usableFromInline
114-
struct SendingStorage<Value>: ~Copyable, @unchecked Sendable {
115-
@usableFromInline
116-
struct ValueAlreadySentError: Error {
117-
@usableFromInline
118-
init() {}
119-
}
110+
// /// Temporary storage for value being sent from one isolation domain to another
111+
// // use NIOLockedValueBox instead of Mutex to avoid compiler crashes on 6.0
112+
// // see https://github.com/swiftlang/swift/issues/78048
113+
// @usableFromInline
114+
// struct SendingStorage<Value>: ~Copyable, @unchecked Sendable {
115+
// @usableFromInline
116+
// struct ValueAlreadySentError: Error {
117+
// @usableFromInline
118+
// init() {}
119+
// }
120120

121-
@usableFromInline
122-
// let storage: Mutex<Value?>
123-
let storage: NIOLockedValueBox<Value?>
121+
// @usableFromInline
122+
// // let storage: Mutex<Value?>
123+
// let storage: NIOLockedValueBox<Value?>
124124

125-
@inlinable
126-
init(_ value: sending Value) {
127-
self.storage = .init(value)
128-
}
125+
// @inlinable
126+
// init(_ value: sending Value) {
127+
// self.storage = .init(value)
128+
// }
129129

130-
@inlinable
131-
func get() throws -> Value {
132-
// try self.storage.withLock {
133-
try self.storage.withLockedValue {
134-
guard let value = $0 else { throw ValueAlreadySentError() }
135-
$0 = nil
136-
return value
137-
}
138-
}
139-
}
130+
// @inlinable
131+
// func get() throws -> Value {
132+
// // try self.storage.withLock {
133+
// try self.storage.withLockedValue {
134+
// guard let value = $0 else { throw ValueAlreadySentError() }
135+
// $0 = nil
136+
// return value
137+
// }
138+
// }
139+
// }

0 commit comments

Comments
 (0)