From a4c04e93b7394d2e78088691829a9aa4733f0895 Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Sat, 6 Dec 2025 17:32:09 -0600 Subject: [PATCH 1/8] initial commit to support multi concurrency on Lambda Managed Instances --- .../FoundationSupport/Lambda+JSON.swift | 4 +- Sources/AWSLambdaRuntime/Lambda+Codable.swift | 4 +- Sources/AWSLambdaRuntime/LambdaHandlers.swift | 22 ++-- Sources/AWSLambdaRuntime/LambdaRuntime.swift | 112 ++++++++++++------ .../AWSLambdaRuntime/LambdaRuntimeError.swift | 1 - Sources/AWSLambdaRuntime/Utils.swift | 54 ++++----- 6 files changed, 118 insertions(+), 79 deletions(-) diff --git a/Sources/AWSLambdaRuntime/FoundationSupport/Lambda+JSON.swift b/Sources/AWSLambdaRuntime/FoundationSupport/Lambda+JSON.swift index 646e77f9..fa2cdb98 100644 --- a/Sources/AWSLambdaRuntime/FoundationSupport/Lambda+JSON.swift +++ b/Sources/AWSLambdaRuntime/FoundationSupport/Lambda+JSON.swift @@ -137,7 +137,7 @@ extension LambdaRuntime { decoder: JSONDecoder = JSONDecoder(), encoder: JSONEncoder = JSONEncoder(), logger: Logger = Logger(label: "LambdaRuntime"), - body: sending @escaping (Event, LambdaContext) async throws -> Output + body: @Sendable @escaping (Event, LambdaContext) async throws -> Output ) where Handler == LambdaCodableAdapter< @@ -164,7 +164,7 @@ extension LambdaRuntime { public convenience init( decoder: JSONDecoder = JSONDecoder(), logger: Logger = Logger(label: "LambdaRuntime"), - body: sending @escaping (Event, LambdaContext) async throws -> Void + body: @Sendable @escaping (Event, LambdaContext) async throws -> Void ) where Handler == LambdaCodableAdapter< diff --git a/Sources/AWSLambdaRuntime/Lambda+Codable.swift b/Sources/AWSLambdaRuntime/Lambda+Codable.swift index f20f5a47..9d4e732e 100644 --- a/Sources/AWSLambdaRuntime/Lambda+Codable.swift +++ b/Sources/AWSLambdaRuntime/Lambda+Codable.swift @@ -17,7 +17,7 @@ import NIOCore /// The protocol a decoder must conform to so that it can be used with ``LambdaCodableAdapter`` to decode incoming /// `ByteBuffer` events. -public protocol LambdaEventDecoder { +public protocol LambdaEventDecoder: Sendable { /// Decode the `ByteBuffer` representing the received event into the generic `Event` type /// the handler will receive. /// - Parameters: @@ -29,7 +29,7 @@ public protocol LambdaEventDecoder { /// The protocol an encoder must conform to so that it can be used with ``LambdaCodableAdapter`` to encode the generic /// ``LambdaOutputEncoder/Output`` object into a `ByteBuffer`. -public protocol LambdaOutputEncoder { +public protocol LambdaOutputEncoder: Sendable { associatedtype Output /// Encode the generic type `Output` the handler has returned into a `ByteBuffer`. diff --git a/Sources/AWSLambdaRuntime/LambdaHandlers.swift b/Sources/AWSLambdaRuntime/LambdaHandlers.swift index 9ff33121..349d48c9 100644 --- a/Sources/AWSLambdaRuntime/LambdaHandlers.swift +++ b/Sources/AWSLambdaRuntime/LambdaHandlers.swift @@ -23,7 +23,7 @@ import NIOCore /// ``LambdaResponseStreamWriter/finish()`` or ``LambdaResponseStreamWriter/writeAndFinish(_:)``, /// the ``handle(_:responseWriter:context:)`` function is free to execute any background work. @available(LambdaSwift 2.0, *) -public protocol StreamingLambdaHandler: _Lambda_SendableMetatype { +public protocol StreamingLambdaHandler: Sendable, _Lambda_SendableMetatype { /// The handler function -- implement the business logic of the Lambda function here. /// - Parameters: /// - event: The invocation's input data. @@ -48,7 +48,7 @@ public protocol StreamingLambdaHandler: _Lambda_SendableMetatype { /// A writer object to write the Lambda response stream into. The HTTP response is started lazily. /// before the first call to ``write(_:)`` or ``writeAndFinish(_:)``. -public protocol LambdaResponseStreamWriter { +public protocol LambdaResponseStreamWriter: Sendable { /// Write a response part into the stream. Bytes written are streamed continually. /// - Parameter buffer: The buffer to write. /// - Parameter hasCustomHeaders: If `true`, the response will be sent with custom HTTP status code and headers. @@ -69,7 +69,7 @@ public protocol LambdaResponseStreamWriter { /// - note: This handler protocol does not support response streaming because the output has to be encoded prior to it being sent, e.g. it is not possible to encode a partial/incomplete JSON string. /// This protocol also does not support the execution of background work after the response has been returned -- the ``LambdaWithBackgroundProcessingHandler`` protocol caters for such use-cases. @available(LambdaSwift 2.0, *) -public protocol LambdaHandler { +public protocol LambdaHandler: Sendable { /// Generic input type. /// The body of the request sent to Lambda will be decoded into this type for the handler to consume. associatedtype Event @@ -92,7 +92,7 @@ public protocol LambdaHandler { /// ``LambdaWithBackgroundProcessingHandler/handle(_:outputWriter:context:)`` function is then /// free to implement any background work after the result has been sent to the AWS Lambda control plane. @available(LambdaSwift 2.0, *) -public protocol LambdaWithBackgroundProcessingHandler { +public protocol LambdaWithBackgroundProcessingHandler: Sendable { /// Generic input type. /// The body of the request sent to Lambda will be decoded into this type for the handler to consume. associatedtype Event @@ -116,7 +116,7 @@ public protocol LambdaWithBackgroundProcessingHandler { /// Used with ``LambdaWithBackgroundProcessingHandler``. /// A mechanism to "return" an output from ``LambdaWithBackgroundProcessingHandler/handle(_:outputWriter:context:)`` without the function needing to /// have a return type and exit at that point. This allows for background work to be executed _after_ a response has been sent to the AWS Lambda response endpoint. -public protocol LambdaResponseWriter { +public protocol LambdaResponseWriter: Sendable { associatedtype Output /// Sends the generic ``LambdaResponseWriter/Output`` object (representing the computed result of the handler) /// to the AWS Lambda response endpoint. @@ -157,18 +157,18 @@ public struct StreamingClosureHandler: StreamingLambdaHandler { /// A ``LambdaHandler`` conforming handler object that can be constructed with a closure. /// Allows for a handler to be defined in a clean manner, leveraging Swift's trailing closure syntax. @available(LambdaSwift 2.0, *) -public struct ClosureHandler: LambdaHandler { - let body: (Event, LambdaContext) async throws -> Output +public struct ClosureHandler: LambdaHandler { + let body: @Sendable (Event, LambdaContext) async throws -> Output /// Initialize with a closure handler over generic `Input` and `Output` types. /// - Parameter body: The handler function written as a closure. - public init(body: sending @escaping (Event, LambdaContext) async throws -> Output) where Output: Encodable { + public init(body: @Sendable @escaping (Event, LambdaContext) async throws -> Output) where Output: Encodable { self.body = body } /// Initialize with a closure handler over a generic `Input` type, and a `Void` `Output`. /// - Parameter body: The handler function written as a closure. - public init(body: @escaping (Event, LambdaContext) async throws -> Void) where Output == Void { + public init(body: @Sendable @escaping (Event, LambdaContext) async throws -> Void) where Output == Void { self.body = body } @@ -210,7 +210,7 @@ extension LambdaRuntime { encoder: sending Encoder, decoder: sending Decoder, logger: Logger = Logger(label: "LambdaRuntime"), - body: sending @escaping (Event, LambdaContext) async throws -> Output + body: @Sendable @escaping (Event, LambdaContext) async throws -> Output ) where Handler == LambdaCodableAdapter< @@ -240,7 +240,7 @@ extension LambdaRuntime { public convenience init( decoder: sending Decoder, logger: Logger = Logger(label: "LambdaRuntime"), - body: sending @escaping (Event, LambdaContext) async throws -> Void + body: @Sendable @escaping (Event, LambdaContext) async throws -> Void ) where Handler == LambdaCodableAdapter< diff --git a/Sources/AWSLambdaRuntime/LambdaRuntime.swift b/Sources/AWSLambdaRuntime/LambdaRuntime.swift index 2a8acc7a..1b599354 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntime.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntime.swift @@ -25,8 +25,7 @@ private let _isRunning = Atomic(false) @available(LambdaSwift 2.0, *) public final class LambdaRuntime: Sendable where Handler: StreamingLambdaHandler { @usableFromInline - /// we protect the handler behind a Mutex to ensure that we only ever have one copy of it - let handlerStorage: SendingStorage + let handler: Handler @usableFromInline let logger: Logger @usableFromInline @@ -37,7 +36,7 @@ public final class LambdaRuntime: Sendable where Handler: StreamingLamb eventLoop: EventLoop = Lambda.defaultEventLoop, logger: Logger = Logger(label: "LambdaRuntime") ) { - self.handlerStorage = SendingStorage(handler) + self.handler = handler self.eventLoop = eventLoop // by setting the log level here, we understand it can not be changed dynamically at runtime @@ -74,45 +73,47 @@ public final class LambdaRuntime: Sendable where Handler: StreamingLamb _isRunning.store(false, ordering: .releasing) } - // The handler can be non-sendable, we want to ensure we only ever have one copy of it - let handler = try? self.handlerStorage.get() - guard let handler else { - throw LambdaRuntimeError(code: .handlerCanOnlyBeGetOnce) - } - // are we running inside an AWS Lambda runtime environment ? // AWS_LAMBDA_RUNTIME_API is set when running on Lambda // https://docs.aws.amazon.com/lambda/latest/dg/runtimes-api.html if let runtimeEndpoint = Lambda.env("AWS_LAMBDA_RUNTIME_API") { - let ipAndPort = runtimeEndpoint.split(separator: ":", maxSplits: 1) - let ip = String(ipAndPort[0]) - guard let port = Int(ipAndPort[1]) else { throw LambdaRuntimeError(code: .invalidPort) } - - do { - try await LambdaRuntimeClient.withRuntimeClient( - configuration: .init(ip: ip, port: port), + // Get the max concurrency authorized by user when running on + // Lambda Managed Instances + // This is not documented anywhere, except the NodeJS runtime + // https://github.com/aws/aws-lambda-nodejs-runtime-interface-client/blob/a4560c87426fa0a34756296a30d7add1388e575c/src/utils/env.ts#L34 + // and + // https://github.com/aws/aws-lambda-nodejs-runtime-interface-client/blob/a4560c87426fa0a34756296a30d7add1388e575c/src/worker/ignition.ts#L12 + let maxConcurrency = Int(Lambda.env("AWS_LAMBDA_MAX_CONCURRENCY") ?? "1") ?? 1 + + // when max concurrency is 1, do not pay the overhead of launching a Task + if maxConcurrency <= 1 { + self.logger.trace("Starting one Runtime Interface Client") + try await self.startRuntimeInterfaceClient( + endpoint: runtimeEndpoint, + handler: handler, eventLoop: self.eventLoop, logger: self.logger - ) { runtimeClient in - try await Lambda.runLoop( - runtimeClient: runtimeClient, - handler: handler, - logger: self.logger - ) - } - } catch { - // catch top level errors that have not been handled until now - // this avoids the runtime to crash and generate a backtrace - if let error = error as? LambdaRuntimeError, - error.code != .connectionToControlPlaneLost - { - // if the error is a LambdaRuntimeError but not a connection error, - // we rethrow it to preserve existing behaviour - self.logger.error("LambdaRuntime.run() failed with error", metadata: ["error": "\(error)"]) - throw error - } else { - self.logger.trace("LambdaRuntime.run() connection lost") + ) + } else { + try await withThrowingTaskGroup(of: Void.self) { group in + + self.logger.trace("Starting \(maxConcurrency) Runtime Interface Clients") + for i in 0..: Sendable where Handler: StreamingLamb ) { runtimeClient in try await Lambda.runLoop( runtimeClient: runtimeClient, - handler: handler, + handler: self.handler, logger: self.logger ) } @@ -152,4 +153,43 @@ public final class LambdaRuntime: Sendable where Handler: StreamingLamb #endif } } + + internal func startRuntimeInterfaceClient( + endpoint: String, + handler: Handler, + eventLoop: EventLoop, + logger: Logger + ) async throws { + + let ipAndPort = endpoint.split(separator: ":", maxSplits: 1) + let ip = String(ipAndPort[0]) + guard let port = Int(ipAndPort[1]) else { throw LambdaRuntimeError(code: .invalidPort) } + + do { + try await LambdaRuntimeClient.withRuntimeClient( + configuration: .init(ip: ip, port: port), + eventLoop: eventLoop, + logger: logger + ) { runtimeClient in + try await Lambda.runLoop( + runtimeClient: runtimeClient, + handler: handler, + logger: logger + ) + } + } catch { + // catch top level errors that have not been handled until now + // this avoids the runtime to crash and generate a backtrace + if let error = error as? LambdaRuntimeError, + error.code != .connectionToControlPlaneLost + { + // if the error is a LambdaRuntimeError but not a connection error, + // we rethrow it to preserve existing behaviour + logger.error("LambdaRuntime.run() failed with error", metadata: ["error": "\(error)"]) + throw error + } else { + logger.trace("LambdaRuntime.run() connection lost") + } + } + } } diff --git a/Sources/AWSLambdaRuntime/LambdaRuntimeError.swift b/Sources/AWSLambdaRuntime/LambdaRuntimeError.swift index b9eada07..929688a6 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntimeError.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntimeError.swift @@ -34,7 +34,6 @@ package struct LambdaRuntimeError: Error { case missingLambdaRuntimeAPIEnvironmentVariable case runtimeCanOnlyBeStartedOnce - case handlerCanOnlyBeGetOnce case invalidPort } diff --git a/Sources/AWSLambdaRuntime/Utils.swift b/Sources/AWSLambdaRuntime/Utils.swift index 8a8d0442..71020ccb 100644 --- a/Sources/AWSLambdaRuntime/Utils.swift +++ b/Sources/AWSLambdaRuntime/Utils.swift @@ -107,33 +107,33 @@ extension AmazonHeaders { } } -/// Temporary storage for value being sent from one isolation domain to another -// use NIOLockedValueBox instead of Mutex to avoid compiler crashes on 6.0 -// see https://github.com/swiftlang/swift/issues/78048 -@usableFromInline -struct SendingStorage: ~Copyable, @unchecked Sendable { - @usableFromInline - struct ValueAlreadySentError: Error { - @usableFromInline - init() {} - } +// /// Temporary storage for value being sent from one isolation domain to another +// // use NIOLockedValueBox instead of Mutex to avoid compiler crashes on 6.0 +// // see https://github.com/swiftlang/swift/issues/78048 +// @usableFromInline +// struct SendingStorage: ~Copyable, @unchecked Sendable { +// @usableFromInline +// struct ValueAlreadySentError: Error { +// @usableFromInline +// init() {} +// } - @usableFromInline - // let storage: Mutex - let storage: NIOLockedValueBox +// @usableFromInline +// // let storage: Mutex +// let storage: NIOLockedValueBox - @inlinable - init(_ value: sending Value) { - self.storage = .init(value) - } +// @inlinable +// init(_ value: sending Value) { +// self.storage = .init(value) +// } - @inlinable - func get() throws -> Value { - // try self.storage.withLock { - try self.storage.withLockedValue { - guard let value = $0 else { throw ValueAlreadySentError() } - $0 = nil - return value - } - } -} +// @inlinable +// func get() throws -> Value { +// // try self.storage.withLock { +// try self.storage.withLockedValue { +// guard let value = $0 else { throw ValueAlreadySentError() } +// $0 = nil +// return value +// } +// } +// } From da7c2fb5a9219aeb58177940b8d603d2fdfa9613 Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Sat, 6 Dec 2025 17:46:06 -0600 Subject: [PATCH 2/8] swift-format --- Sources/AWSLambdaRuntime/LambdaRuntime.swift | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Sources/AWSLambdaRuntime/LambdaRuntime.swift b/Sources/AWSLambdaRuntime/LambdaRuntime.swift index 1b599354..72ca95df 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntime.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntime.swift @@ -78,7 +78,7 @@ public final class LambdaRuntime: Sendable where Handler: StreamingLamb // https://docs.aws.amazon.com/lambda/latest/dg/runtimes-api.html if let runtimeEndpoint = Lambda.env("AWS_LAMBDA_RUNTIME_API") { - // Get the max concurrency authorized by user when running on + // Get the max concurrency authorized by user when running on // Lambda Managed Instances // This is not documented anywhere, except the NodeJS runtime // https://github.com/aws/aws-lambda-nodejs-runtime-interface-client/blob/a4560c87426fa0a34756296a30d7add1388e575c/src/utils/env.ts#L34 @@ -102,7 +102,7 @@ public final class LambdaRuntime: Sendable where Handler: StreamingLamb for i in 0..: Sendable where Handler: StreamingLamb } } // Wait for all tasks to complete and propagate any errors - try await group.waitForAll() + try await group.waitForAll() } } From 8615a63bca9bf244be85071c68815000182508a1 Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Sat, 6 Dec 2025 17:52:11 -0600 Subject: [PATCH 3/8] fix tests --- ...bdaResponseStreamWriter+HeadersTests.swift | 170 +++++++++++++----- 1 file changed, 123 insertions(+), 47 deletions(-) diff --git a/Tests/AWSLambdaRuntimeTests/LambdaResponseStreamWriter+HeadersTests.swift b/Tests/AWSLambdaRuntimeTests/LambdaResponseStreamWriter+HeadersTests.swift index c411dd24..d5e63905 100644 --- a/Tests/AWSLambdaRuntimeTests/LambdaResponseStreamWriter+HeadersTests.swift +++ b/Tests/AWSLambdaRuntimeTests/LambdaResponseStreamWriter+HeadersTests.swift @@ -16,6 +16,7 @@ import AWSLambdaRuntime import Logging import NIOCore +import Synchronization import Testing #if canImport(FoundationEssentials) @@ -537,10 +538,23 @@ struct LambdaResponseStreamWriterHeadersTests { // MARK: - Mock Implementation /// Mock implementation of LambdaResponseStreamWriter for testing -final class MockLambdaResponseStreamWriter: LambdaResponseStreamWriter { - private(set) var writtenBuffers: [ByteBuffer] = [] - private(set) var isFinished = false - private(set) var hasCustomHeaders = false +@available(LambdaSwift 2.0, *) +final class MockLambdaResponseStreamWriter: LambdaResponseStreamWriter, Sendable { + private let _writtenBuffers: Mutex<[ByteBuffer]> = Mutex([]) + private let _isFinished: Mutex = Mutex(false) + private let _hasCustomHeaders: Mutex = Mutex(false) + + var writtenBuffers: [ByteBuffer] { + _writtenBuffers.withLock { $0 } + } + + var isFinished: Bool { + _isFinished.withLock { $0 } + } + + var hasCustomHeaders: Bool { + _hasCustomHeaders.withLock { $0 } + } // Add a JSON string with separator for writeStatusAndHeaders func writeStatusAndHeaders( @@ -559,30 +573,47 @@ final class MockLambdaResponseStreamWriter: LambdaResponseStreamWriter { } func write(_ buffer: ByteBuffer, hasCustomHeaders: Bool = false) async throws { - writtenBuffers.append(buffer) - self.hasCustomHeaders = hasCustomHeaders + _writtenBuffers.withLock { $0.append(buffer) } + _hasCustomHeaders.withLock { $0 = hasCustomHeaders } } func finish() async throws { - isFinished = true + _isFinished.withLock { $0 = true } } func writeAndFinish(_ buffer: ByteBuffer) async throws { - writtenBuffers.append(buffer) - isFinished = true + _writtenBuffers.withLock { $0.append(buffer) } + _isFinished.withLock { $0 = true } } } // MARK: - Error Handling Mock Implementations /// Mock implementation that fails on specific write calls for testing error propagation -final class FailingMockLambdaResponseStreamWriter: LambdaResponseStreamWriter { - private(set) var writtenBuffers: [ByteBuffer] = [] - private(set) var writeCallCount = 0 - private(set) var isFinished = false - private(set) var hasCustomHeaders = false +@available(LambdaSwift 2.0, *) +final class FailingMockLambdaResponseStreamWriter: LambdaResponseStreamWriter, Sendable { + private let _writtenBuffers: Mutex<[ByteBuffer]> = Mutex([]) + private let _writeCallCount: Mutex = Mutex(0) + private let _isFinished: Mutex = Mutex(false) + private let _hasCustomHeaders: Mutex = Mutex(false) private let failOnWriteCall: Int + var writtenBuffers: [ByteBuffer] { + _writtenBuffers.withLock { $0 } + } + + var writeCallCount: Int { + _writeCallCount.withLock { $0 } + } + + var isFinished: Bool { + _isFinished.withLock { $0 } + } + + var hasCustomHeaders: Bool { + _hasCustomHeaders.withLock { $0 } + } + init(failOnWriteCall: Int) { self.failOnWriteCall = failOnWriteCall } @@ -597,18 +628,21 @@ final class FailingMockLambdaResponseStreamWriter: LambdaResponseStreamWriter { } func write(_ buffer: ByteBuffer, hasCustomHeaders: Bool = false) async throws { - writeCallCount += 1 - self.hasCustomHeaders = hasCustomHeaders + let count = _writeCallCount.withLock { value in + value += 1 + return value + } + _hasCustomHeaders.withLock { $0 = hasCustomHeaders } - if writeCallCount == failOnWriteCall { + if count == failOnWriteCall { throw TestWriteError() } - writtenBuffers.append(buffer) + _writtenBuffers.withLock { $0.append(buffer) } } func finish() async throws { - isFinished = true + _isFinished.withLock { $0 = true } } func writeAndFinish(_ buffer: ByteBuffer) async throws { @@ -690,13 +724,38 @@ struct FailingJSONEncoder: LambdaOutputEncoder { // MARK: - Additional Mock Implementations for Integration Tests /// Mock implementation that tracks additional state for integration testing -final class TrackingLambdaResponseStreamWriter: LambdaResponseStreamWriter { - private(set) var writtenBuffers: [ByteBuffer] = [] - private(set) var writeCallCount = 0 - private(set) var finishCallCount = 0 - private(set) var writeAndFinishCallCount = 0 - private(set) var isFinished = false - private(set) var hasCustomHeaders = false +@available(LambdaSwift 2.0, *) +final class TrackingLambdaResponseStreamWriter: LambdaResponseStreamWriter, Sendable { + private let _writtenBuffers: Mutex<[ByteBuffer]> = Mutex([]) + private let _writeCallCount: Mutex = Mutex(0) + private let _finishCallCount: Mutex = Mutex(0) + private let _writeAndFinishCallCount: Mutex = Mutex(0) + private let _isFinished: Mutex = Mutex(false) + private let _hasCustomHeaders: Mutex = Mutex(false) + + var writtenBuffers: [ByteBuffer] { + _writtenBuffers.withLock { $0 } + } + + var writeCallCount: Int { + _writeCallCount.withLock { $0 } + } + + var finishCallCount: Int { + _finishCallCount.withLock { $0 } + } + + var writeAndFinishCallCount: Int { + _writeAndFinishCallCount.withLock { $0 } + } + + var isFinished: Bool { + _isFinished.withLock { $0 } + } + + var hasCustomHeaders: Bool { + _hasCustomHeaders.withLock { $0 } + } func writeStatusAndHeaders( _ response: Response, @@ -708,36 +767,53 @@ final class TrackingLambdaResponseStreamWriter: LambdaResponseStreamWriter { } func write(_ buffer: ByteBuffer, hasCustomHeaders: Bool = false) async throws { - writeCallCount += 1 - self.hasCustomHeaders = hasCustomHeaders - writtenBuffers.append(buffer) + _writeCallCount.withLock { $0 += 1 } + _hasCustomHeaders.withLock { $0 = hasCustomHeaders } + _writtenBuffers.withLock { $0.append(buffer) } } func finish() async throws { - finishCallCount += 1 - isFinished = true + _finishCallCount.withLock { $0 += 1 } + _isFinished.withLock { $0 = true } } func writeAndFinish(_ buffer: ByteBuffer) async throws { - writeAndFinishCallCount += 1 - writtenBuffers.append(buffer) - isFinished = true + _writeAndFinishCallCount.withLock { $0 += 1 } + _writtenBuffers.withLock { $0.append(buffer) } + _isFinished.withLock { $0 = true } } } /// Mock implementation with custom behavior for integration testing -final class CustomBehaviorLambdaResponseStreamWriter: LambdaResponseStreamWriter { - private(set) var writtenBuffers: [ByteBuffer] = [] - private(set) var customBehaviorTriggered = false - private(set) var isFinished = false - private(set) var hasCustomHeaders = false +@available(LambdaSwift 2.0, *) +final class CustomBehaviorLambdaResponseStreamWriter: LambdaResponseStreamWriter, Sendable { + private let _writtenBuffers: Mutex<[ByteBuffer]> = Mutex([]) + private let _customBehaviorTriggered: Mutex = Mutex(false) + private let _isFinished: Mutex = Mutex(false) + private let _hasCustomHeaders: Mutex = Mutex(false) + + var writtenBuffers: [ByteBuffer] { + _writtenBuffers.withLock { $0 } + } + + var customBehaviorTriggered: Bool { + _customBehaviorTriggered.withLock { $0 } + } + + var isFinished: Bool { + _isFinished.withLock { $0 } + } + + var hasCustomHeaders: Bool { + _hasCustomHeaders.withLock { $0 } + } func writeStatusAndHeaders( _ response: Response, encoder: (any LambdaOutputEncoder)? = nil ) async throws { - customBehaviorTriggered = true + _customBehaviorTriggered.withLock { $0 = true } var buffer = ByteBuffer() buffer.writeString("{\"statusCode\":200}") try await write(buffer, hasCustomHeaders: true) @@ -745,18 +821,18 @@ final class CustomBehaviorLambdaResponseStreamWriter: LambdaResponseStreamWriter func write(_ buffer: ByteBuffer, hasCustomHeaders: Bool = false) async throws { // Trigger custom behavior on any write - customBehaviorTriggered = true - self.hasCustomHeaders = hasCustomHeaders - writtenBuffers.append(buffer) + _customBehaviorTriggered.withLock { $0 = true } + _hasCustomHeaders.withLock { $0 = hasCustomHeaders } + _writtenBuffers.withLock { $0.append(buffer) } } func finish() async throws { - isFinished = true + _isFinished.withLock { $0 = true } } func writeAndFinish(_ buffer: ByteBuffer) async throws { - customBehaviorTriggered = true - writtenBuffers.append(buffer) - isFinished = true + _customBehaviorTriggered.withLock { $0 = true } + _writtenBuffers.withLock { $0.append(buffer) } + _isFinished.withLock { $0 = true } } } From 949386fe06d77b7b95f513231279a8120b430fc0 Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Sat, 6 Dec 2025 18:13:20 -0600 Subject: [PATCH 4/8] fix examples --- Examples/APIGatewayV2+LambdaAuthorizer/Package.swift | 2 +- .../Sources/AuthorizerLambda/main.swift | 2 +- Examples/HummingbirdLambda/Package.swift | 4 ++-- Examples/Streaming+Codable/Package.swift | 2 +- .../Streaming+Codable/Sources/LambdaStreaming+Codable.swift | 2 +- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/Examples/APIGatewayV2+LambdaAuthorizer/Package.swift b/Examples/APIGatewayV2+LambdaAuthorizer/Package.swift index 5d714cb2..8e6a3ee6 100644 --- a/Examples/APIGatewayV2+LambdaAuthorizer/Package.swift +++ b/Examples/APIGatewayV2+LambdaAuthorizer/Package.swift @@ -16,7 +16,7 @@ let package = Package( // For standalone usage, comment the line above and uncomment below: // .package(url: "https://github.com/awslabs/swift-aws-lambda-runtime.git", from: "2.0.0"), - .package(url: "https://github.com/awslabs/swift-aws-lambda-events.git", from: "1.0.0"), + .package(url: "https://github.com/awslabs/swift-aws-lambda-events.git", from: "1.4.0"), ], targets: [ .executableTarget( diff --git a/Examples/APIGatewayV2+LambdaAuthorizer/Sources/AuthorizerLambda/main.swift b/Examples/APIGatewayV2+LambdaAuthorizer/Sources/AuthorizerLambda/main.swift index 34f0180d..3cb54bb2 100644 --- a/Examples/APIGatewayV2+LambdaAuthorizer/Sources/AuthorizerLambda/main.swift +++ b/Examples/APIGatewayV2+LambdaAuthorizer/Sources/AuthorizerLambda/main.swift @@ -58,7 +58,7 @@ let policyAuthorizerHandler: // // This code doesn't perform any type of token validation. It should be used as a reference only. let simpleAuthorizerHandler: - (APIGatewayLambdaAuthorizerRequest, LambdaContext) async throws -> APIGatewayLambdaAuthorizerSimpleResponse = { + @Sendable (APIGatewayLambdaAuthorizerRequest, LambdaContext) async throws -> APIGatewayLambdaAuthorizerSimpleResponse = { (_: APIGatewayLambdaAuthorizerRequest, context: LambdaContext) in context.logger.debug("+++ Simple Authorizer called +++") diff --git a/Examples/HummingbirdLambda/Package.swift b/Examples/HummingbirdLambda/Package.swift index de09ea8f..6f21ec4e 100644 --- a/Examples/HummingbirdLambda/Package.swift +++ b/Examples/HummingbirdLambda/Package.swift @@ -8,7 +8,7 @@ let package = Package( platforms: [.macOS(.v15)], dependencies: [ // For local development (default) - .package(name: "swift-aws-lambda-runtime", path: "../.."), + // .package(name: "swift-aws-lambda-runtime", path: "../.."), // For standalone usage, comment the line above and uncomment below: // .package(url: "https://github.com/awslabs/swift-aws-lambda-runtime.git", from: "2.0.0"), @@ -17,7 +17,7 @@ let package = Package( url: "https://github.com/hummingbird-project/hummingbird-lambda.git", branch: "main" ), - .package(url: "https://github.com/awslabs/swift-aws-lambda-events.git", from: "1.1.0"), + .package(url: "https://github.com/awslabs/swift-aws-lambda-events.git", from: "1.4.0"), ], targets: [ .executableTarget( diff --git a/Examples/Streaming+Codable/Package.swift b/Examples/Streaming+Codable/Package.swift index c3e68e45..bffd0c31 100644 --- a/Examples/Streaming+Codable/Package.swift +++ b/Examples/Streaming+Codable/Package.swift @@ -12,7 +12,7 @@ let package = Package( // For standalone usage, comment the line above and uncomment below: // .package(url: "https://github.com/awslabs/swift-aws-lambda-runtime.git", from: "2.0.0"), - .package(url: "https://github.com/awslabs/swift-aws-lambda-events.git", from: "1.0.0"), + .package(url: "https://github.com/awslabs/swift-aws-lambda-events.git", from: "1.4.0"), ], targets: [ .executableTarget( diff --git a/Examples/Streaming+Codable/Sources/LambdaStreaming+Codable.swift b/Examples/Streaming+Codable/Sources/LambdaStreaming+Codable.swift index 31f5e9d9..b46cfea1 100644 --- a/Examples/Streaming+Codable/Sources/LambdaStreaming+Codable.swift +++ b/Examples/Streaming+Codable/Sources/LambdaStreaming+Codable.swift @@ -28,7 +28,7 @@ import Foundation /// This handler protocol supports response streaming and background work execution. /// Background work can be executed after closing the response stream by calling /// ``LambdaResponseStreamWriter/finish()`` or ``LambdaResponseStreamWriter/writeAndFinish(_:)``. -public protocol StreamingLambdaHandlerWithEvent: _Lambda_SendableMetatype { +public protocol StreamingLambdaHandlerWithEvent: Sendable, _Lambda_SendableMetatype { /// Generic input type that will be decoded from JSON. associatedtype Event: Decodable From bee9a592e1d73e0900b62a1f94c3737bb5f8f922 Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Sat, 6 Dec 2025 18:14:57 -0600 Subject: [PATCH 5/8] swift format --- .../Sources/AuthorizerLambda/main.swift | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/Examples/APIGatewayV2+LambdaAuthorizer/Sources/AuthorizerLambda/main.swift b/Examples/APIGatewayV2+LambdaAuthorizer/Sources/AuthorizerLambda/main.swift index 3cb54bb2..c49163d9 100644 --- a/Examples/APIGatewayV2+LambdaAuthorizer/Sources/AuthorizerLambda/main.swift +++ b/Examples/APIGatewayV2+LambdaAuthorizer/Sources/AuthorizerLambda/main.swift @@ -58,21 +58,22 @@ let policyAuthorizerHandler: // // This code doesn't perform any type of token validation. It should be used as a reference only. let simpleAuthorizerHandler: - @Sendable (APIGatewayLambdaAuthorizerRequest, LambdaContext) async throws -> APIGatewayLambdaAuthorizerSimpleResponse = { - (_: APIGatewayLambdaAuthorizerRequest, context: LambdaContext) in + @Sendable (APIGatewayLambdaAuthorizerRequest, LambdaContext) async throws -> + APIGatewayLambdaAuthorizerSimpleResponse = { + (_: APIGatewayLambdaAuthorizerRequest, context: LambdaContext) in - context.logger.debug("+++ Simple Authorizer called +++") + context.logger.debug("+++ Simple Authorizer called +++") - // typically, this function will check the validity of the incoming token received in the request + // typically, this function will check the validity of the incoming token received in the request - return APIGatewayLambdaAuthorizerSimpleResponse( - // this is the authorization decision: yes or no - isAuthorized: true, + return APIGatewayLambdaAuthorizerSimpleResponse( + // this is the authorization decision: yes or no + isAuthorized: true, - // this is additional context we want to return to the caller - context: ["abc1": "xyz1"] - ) - } + // this is additional context we want to return to the caller + context: ["abc1": "xyz1"] + ) + } // create the runtime and start polling for new events. // in this demo we use the simple authorizer handler From 1faeecfb51ad677ff8d1771c3ca30b7683ba6913 Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Mon, 8 Dec 2025 09:09:35 +0100 Subject: [PATCH 6/8] add links to the doc in the source code --- Sources/AWSLambdaRuntime/LambdaRuntime.swift | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/Sources/AWSLambdaRuntime/LambdaRuntime.swift b/Sources/AWSLambdaRuntime/LambdaRuntime.swift index 72ca95df..bd409d5f 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntime.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntime.swift @@ -80,9 +80,12 @@ public final class LambdaRuntime: Sendable where Handler: StreamingLamb // Get the max concurrency authorized by user when running on // Lambda Managed Instances - // This is not documented anywhere, except the NodeJS runtime + // See: + // - https://docs.aws.amazon.com/lambda/latest/dg/lambda-managed-instances.html#lambda-managed-instances-concurrency-model + // - https://docs.aws.amazon.com/lambda/latest/dg/configuration-envvars.html + // + // and the NodeJS implementation // https://github.com/aws/aws-lambda-nodejs-runtime-interface-client/blob/a4560c87426fa0a34756296a30d7add1388e575c/src/utils/env.ts#L34 - // and // https://github.com/aws/aws-lambda-nodejs-runtime-interface-client/blob/a4560c87426fa0a34756296a30d7add1388e575c/src/worker/ignition.ts#L12 let maxConcurrency = Int(Lambda.env("AWS_LAMBDA_MAX_CONCURRENCY") ?? "1") ?? 1 From 9d66b717f3ccd57c7b37c91248ed596d35fe2812 Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Mon, 8 Dec 2025 09:11:21 +0100 Subject: [PATCH 7/8] consistently use of self. --- Sources/AWSLambdaRuntime/LambdaRuntime.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Sources/AWSLambdaRuntime/LambdaRuntime.swift b/Sources/AWSLambdaRuntime/LambdaRuntime.swift index bd409d5f..4f48eb78 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntime.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntime.swift @@ -94,7 +94,7 @@ public final class LambdaRuntime: Sendable where Handler: StreamingLamb self.logger.trace("Starting one Runtime Interface Client") try await self.startRuntimeInterfaceClient( endpoint: runtimeEndpoint, - handler: handler, + handler: self.handler, eventLoop: self.eventLoop, logger: self.logger ) @@ -111,7 +111,7 @@ public final class LambdaRuntime: Sendable where Handler: StreamingLamb endpoint: runtimeEndpoint, handler: self.handler, eventLoop: self.eventLoop, - logger: logger + logger: self.logger ) } } From 3b076fa9207fe2623c145c61cac66cc8c63099c7 Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Mon, 8 Dec 2025 09:15:58 +0100 Subject: [PATCH 8/8] fix logger --- Sources/AWSLambdaRuntime/LambdaRuntime.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/AWSLambdaRuntime/LambdaRuntime.swift b/Sources/AWSLambdaRuntime/LambdaRuntime.swift index 4f48eb78..cc695c81 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntime.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntime.swift @@ -111,7 +111,7 @@ public final class LambdaRuntime: Sendable where Handler: StreamingLamb endpoint: runtimeEndpoint, handler: self.handler, eventLoop: self.eventLoop, - logger: self.logger + logger: logger ) } }