Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Examples/APIGatewayV2+LambdaAuthorizer/Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ let package = Package(
// For standalone usage, comment the line above and uncomment below:
// .package(url: "https://github.com/awslabs/swift-aws-lambda-runtime.git", from: "2.0.0"),

.package(url: "https://github.com/awslabs/swift-aws-lambda-events.git", from: "1.0.0"),
.package(url: "https://github.com/awslabs/swift-aws-lambda-events.git", from: "1.4.0"),
],
targets: [
.executableTarget(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,22 @@ let policyAuthorizerHandler:
//
// This code doesn't perform any type of token validation. It should be used as a reference only.
let simpleAuthorizerHandler:
(APIGatewayLambdaAuthorizerRequest, LambdaContext) async throws -> APIGatewayLambdaAuthorizerSimpleResponse = {
(_: APIGatewayLambdaAuthorizerRequest, context: LambdaContext) in
@Sendable (APIGatewayLambdaAuthorizerRequest, LambdaContext) async throws ->
APIGatewayLambdaAuthorizerSimpleResponse = {
(_: APIGatewayLambdaAuthorizerRequest, context: LambdaContext) in

context.logger.debug("+++ Simple Authorizer called +++")
context.logger.debug("+++ Simple Authorizer called +++")

// typically, this function will check the validity of the incoming token received in the request
// typically, this function will check the validity of the incoming token received in the request

return APIGatewayLambdaAuthorizerSimpleResponse(
// this is the authorization decision: yes or no
isAuthorized: true,
return APIGatewayLambdaAuthorizerSimpleResponse(
// this is the authorization decision: yes or no
isAuthorized: true,

// this is additional context we want to return to the caller
context: ["abc1": "xyz1"]
)
}
// this is additional context we want to return to the caller
context: ["abc1": "xyz1"]
)
}

// create the runtime and start polling for new events.
// in this demo we use the simple authorizer handler
Expand Down
4 changes: 2 additions & 2 deletions Examples/HummingbirdLambda/Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ let package = Package(
platforms: [.macOS(.v15)],
dependencies: [
// For local development (default)
.package(name: "swift-aws-lambda-runtime", path: "../.."),
// .package(name: "swift-aws-lambda-runtime", path: "../.."),

// For standalone usage, comment the line above and uncomment below:
// .package(url: "https://github.com/awslabs/swift-aws-lambda-runtime.git", from: "2.0.0"),
Expand All @@ -17,7 +17,7 @@ let package = Package(
url: "https://github.com/hummingbird-project/hummingbird-lambda.git",
branch: "main"
),
.package(url: "https://github.com/awslabs/swift-aws-lambda-events.git", from: "1.1.0"),
.package(url: "https://github.com/awslabs/swift-aws-lambda-events.git", from: "1.4.0"),
],
targets: [
.executableTarget(
Expand Down
2 changes: 1 addition & 1 deletion Examples/Streaming+Codable/Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ let package = Package(
// For standalone usage, comment the line above and uncomment below:
// .package(url: "https://github.com/awslabs/swift-aws-lambda-runtime.git", from: "2.0.0"),

.package(url: "https://github.com/awslabs/swift-aws-lambda-events.git", from: "1.0.0"),
.package(url: "https://github.com/awslabs/swift-aws-lambda-events.git", from: "1.4.0"),
],
targets: [
.executableTarget(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import Foundation
/// This handler protocol supports response streaming and background work execution.
/// Background work can be executed after closing the response stream by calling
/// ``LambdaResponseStreamWriter/finish()`` or ``LambdaResponseStreamWriter/writeAndFinish(_:)``.
public protocol StreamingLambdaHandlerWithEvent: _Lambda_SendableMetatype {
public protocol StreamingLambdaHandlerWithEvent: Sendable, _Lambda_SendableMetatype {
/// Generic input type that will be decoded from JSON.
associatedtype Event: Decodable

Expand Down
4 changes: 2 additions & 2 deletions Sources/AWSLambdaRuntime/FoundationSupport/Lambda+JSON.swift
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ extension LambdaRuntime {
decoder: JSONDecoder = JSONDecoder(),
encoder: JSONEncoder = JSONEncoder(),
logger: Logger = Logger(label: "LambdaRuntime"),
body: sending @escaping (Event, LambdaContext) async throws -> Output
body: @Sendable @escaping (Event, LambdaContext) async throws -> Output
)
where
Handler == LambdaCodableAdapter<
Expand All @@ -164,7 +164,7 @@ extension LambdaRuntime {
public convenience init<Event: Decodable>(
decoder: JSONDecoder = JSONDecoder(),
logger: Logger = Logger(label: "LambdaRuntime"),
body: sending @escaping (Event, LambdaContext) async throws -> Void
body: @Sendable @escaping (Event, LambdaContext) async throws -> Void
)
where
Handler == LambdaCodableAdapter<
Expand Down
4 changes: 2 additions & 2 deletions Sources/AWSLambdaRuntime/Lambda+Codable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import NIOCore

/// The protocol a decoder must conform to so that it can be used with ``LambdaCodableAdapter`` to decode incoming
/// `ByteBuffer` events.
public protocol LambdaEventDecoder {
public protocol LambdaEventDecoder: Sendable {
/// Decode the `ByteBuffer` representing the received event into the generic `Event` type
/// the handler will receive.
/// - Parameters:
Expand All @@ -29,7 +29,7 @@ public protocol LambdaEventDecoder {

/// The protocol an encoder must conform to so that it can be used with ``LambdaCodableAdapter`` to encode the generic
/// ``LambdaOutputEncoder/Output`` object into a `ByteBuffer`.
public protocol LambdaOutputEncoder {
public protocol LambdaOutputEncoder: Sendable {
associatedtype Output

/// Encode the generic type `Output` the handler has returned into a `ByteBuffer`.
Expand Down
22 changes: 11 additions & 11 deletions Sources/AWSLambdaRuntime/LambdaHandlers.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import NIOCore
/// ``LambdaResponseStreamWriter/finish()`` or ``LambdaResponseStreamWriter/writeAndFinish(_:)``,
/// the ``handle(_:responseWriter:context:)`` function is free to execute any background work.
@available(LambdaSwift 2.0, *)
public protocol StreamingLambdaHandler: _Lambda_SendableMetatype {
public protocol StreamingLambdaHandler: Sendable, _Lambda_SendableMetatype {
/// The handler function -- implement the business logic of the Lambda function here.
/// - Parameters:
/// - event: The invocation's input data.
Expand All @@ -48,7 +48,7 @@ public protocol StreamingLambdaHandler: _Lambda_SendableMetatype {

/// A writer object to write the Lambda response stream into. The HTTP response is started lazily.
/// before the first call to ``write(_:)`` or ``writeAndFinish(_:)``.
public protocol LambdaResponseStreamWriter {
public protocol LambdaResponseStreamWriter: Sendable {
/// Write a response part into the stream. Bytes written are streamed continually.
/// - Parameter buffer: The buffer to write.
/// - Parameter hasCustomHeaders: If `true`, the response will be sent with custom HTTP status code and headers.
Expand All @@ -69,7 +69,7 @@ public protocol LambdaResponseStreamWriter {
/// - note: This handler protocol does not support response streaming because the output has to be encoded prior to it being sent, e.g. it is not possible to encode a partial/incomplete JSON string.
/// This protocol also does not support the execution of background work after the response has been returned -- the ``LambdaWithBackgroundProcessingHandler`` protocol caters for such use-cases.
@available(LambdaSwift 2.0, *)
public protocol LambdaHandler {
public protocol LambdaHandler: Sendable {
/// Generic input type.
/// The body of the request sent to Lambda will be decoded into this type for the handler to consume.
associatedtype Event
Expand All @@ -92,7 +92,7 @@ public protocol LambdaHandler {
/// ``LambdaWithBackgroundProcessingHandler/handle(_:outputWriter:context:)`` function is then
/// free to implement any background work after the result has been sent to the AWS Lambda control plane.
@available(LambdaSwift 2.0, *)
public protocol LambdaWithBackgroundProcessingHandler {
public protocol LambdaWithBackgroundProcessingHandler: Sendable {
/// Generic input type.
/// The body of the request sent to Lambda will be decoded into this type for the handler to consume.
associatedtype Event
Expand All @@ -116,7 +116,7 @@ public protocol LambdaWithBackgroundProcessingHandler {
/// Used with ``LambdaWithBackgroundProcessingHandler``.
/// A mechanism to "return" an output from ``LambdaWithBackgroundProcessingHandler/handle(_:outputWriter:context:)`` without the function needing to
/// have a return type and exit at that point. This allows for background work to be executed _after_ a response has been sent to the AWS Lambda response endpoint.
public protocol LambdaResponseWriter<Output> {
public protocol LambdaResponseWriter<Output>: Sendable {
associatedtype Output
/// Sends the generic ``LambdaResponseWriter/Output`` object (representing the computed result of the handler)
/// to the AWS Lambda response endpoint.
Expand Down Expand Up @@ -157,18 +157,18 @@ public struct StreamingClosureHandler: StreamingLambdaHandler {
/// A ``LambdaHandler`` conforming handler object that can be constructed with a closure.
/// Allows for a handler to be defined in a clean manner, leveraging Swift's trailing closure syntax.
@available(LambdaSwift 2.0, *)
public struct ClosureHandler<Event: Decodable, Output>: LambdaHandler {
let body: (Event, LambdaContext) async throws -> Output
public struct ClosureHandler<Event: Decodable & Sendable, Output>: LambdaHandler {
let body: @Sendable (Event, LambdaContext) async throws -> Output

/// Initialize with a closure handler over generic `Input` and `Output` types.
/// - Parameter body: The handler function written as a closure.
public init(body: sending @escaping (Event, LambdaContext) async throws -> Output) where Output: Encodable {
public init(body: @Sendable @escaping (Event, LambdaContext) async throws -> Output) where Output: Encodable {
self.body = body
}

/// Initialize with a closure handler over a generic `Input` type, and a `Void` `Output`.
/// - Parameter body: The handler function written as a closure.
public init(body: @escaping (Event, LambdaContext) async throws -> Void) where Output == Void {
public init(body: @Sendable @escaping (Event, LambdaContext) async throws -> Void) where Output == Void {
self.body = body
}

Expand Down Expand Up @@ -210,7 +210,7 @@ extension LambdaRuntime {
encoder: sending Encoder,
decoder: sending Decoder,
logger: Logger = Logger(label: "LambdaRuntime"),
body: sending @escaping (Event, LambdaContext) async throws -> Output
body: @Sendable @escaping (Event, LambdaContext) async throws -> Output
)
where
Handler == LambdaCodableAdapter<
Expand Down Expand Up @@ -240,7 +240,7 @@ extension LambdaRuntime {
public convenience init<Event: Decodable, Decoder: LambdaEventDecoder>(
decoder: sending Decoder,
logger: Logger = Logger(label: "LambdaRuntime"),
body: sending @escaping (Event, LambdaContext) async throws -> Void
body: @Sendable @escaping (Event, LambdaContext) async throws -> Void
)
where
Handler == LambdaCodableAdapter<
Expand Down
115 changes: 79 additions & 36 deletions Sources/AWSLambdaRuntime/LambdaRuntime.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ private let _isRunning = Atomic<Bool>(false)
@available(LambdaSwift 2.0, *)
public final class LambdaRuntime<Handler>: Sendable where Handler: StreamingLambdaHandler {
@usableFromInline
/// we protect the handler behind a Mutex to ensure that we only ever have one copy of it
let handlerStorage: SendingStorage<Handler>
let handler: Handler
@usableFromInline
let logger: Logger
@usableFromInline
Expand All @@ -37,7 +36,7 @@ public final class LambdaRuntime<Handler>: Sendable where Handler: StreamingLamb
eventLoop: EventLoop = Lambda.defaultEventLoop,
logger: Logger = Logger(label: "LambdaRuntime")
) {
self.handlerStorage = SendingStorage(handler)
self.handler = handler
self.eventLoop = eventLoop

// by setting the log level here, we understand it can not be changed dynamically at runtime
Expand Down Expand Up @@ -74,45 +73,50 @@ public final class LambdaRuntime<Handler>: Sendable where Handler: StreamingLamb
_isRunning.store(false, ordering: .releasing)
}

// The handler can be non-sendable, we want to ensure we only ever have one copy of it
let handler = try? self.handlerStorage.get()
guard let handler else {
throw LambdaRuntimeError(code: .handlerCanOnlyBeGetOnce)
}

// are we running inside an AWS Lambda runtime environment ?
// AWS_LAMBDA_RUNTIME_API is set when running on Lambda
// https://docs.aws.amazon.com/lambda/latest/dg/runtimes-api.html
if let runtimeEndpoint = Lambda.env("AWS_LAMBDA_RUNTIME_API") {

let ipAndPort = runtimeEndpoint.split(separator: ":", maxSplits: 1)
let ip = String(ipAndPort[0])
guard let port = Int(ipAndPort[1]) else { throw LambdaRuntimeError(code: .invalidPort) }

do {
try await LambdaRuntimeClient.withRuntimeClient(
configuration: .init(ip: ip, port: port),
// Get the max concurrency authorized by user when running on
// Lambda Managed Instances
// See:
// - https://docs.aws.amazon.com/lambda/latest/dg/lambda-managed-instances.html#lambda-managed-instances-concurrency-model
// - https://docs.aws.amazon.com/lambda/latest/dg/configuration-envvars.html
//
// and the NodeJS implementation
// https://github.com/aws/aws-lambda-nodejs-runtime-interface-client/blob/a4560c87426fa0a34756296a30d7add1388e575c/src/utils/env.ts#L34
// https://github.com/aws/aws-lambda-nodejs-runtime-interface-client/blob/a4560c87426fa0a34756296a30d7add1388e575c/src/worker/ignition.ts#L12
let maxConcurrency = Int(Lambda.env("AWS_LAMBDA_MAX_CONCURRENCY") ?? "1") ?? 1

// when max concurrency is 1, do not pay the overhead of launching a Task
if maxConcurrency <= 1 {
self.logger.trace("Starting one Runtime Interface Client")
try await self.startRuntimeInterfaceClient(
endpoint: runtimeEndpoint,
handler: self.handler,
eventLoop: self.eventLoop,
logger: self.logger
) { runtimeClient in
try await Lambda.runLoop(
runtimeClient: runtimeClient,
handler: handler,
logger: self.logger
)
}
} catch {
// catch top level errors that have not been handled until now
// this avoids the runtime to crash and generate a backtrace
if let error = error as? LambdaRuntimeError,
error.code != .connectionToControlPlaneLost
{
// if the error is a LambdaRuntimeError but not a connection error,
// we rethrow it to preserve existing behaviour
self.logger.error("LambdaRuntime.run() failed with error", metadata: ["error": "\(error)"])
throw error
} else {
self.logger.trace("LambdaRuntime.run() connection lost")
)
} else {
try await withThrowingTaskGroup(of: Void.self) { group in

self.logger.trace("Starting \(maxConcurrency) Runtime Interface Clients")
for i in 0..<maxConcurrency {

group.addTask {
var logger = self.logger
logger[metadataKey: "RIC"] = "\(i)"
try await self.startRuntimeInterfaceClient(
endpoint: runtimeEndpoint,
handler: self.handler,
eventLoop: self.eventLoop,
logger: logger
)
}
}
// Wait for all tasks to complete and propagate any errors
try await group.waitForAll()
}
}

Expand Down Expand Up @@ -141,7 +145,7 @@ public final class LambdaRuntime<Handler>: Sendable where Handler: StreamingLamb
) { runtimeClient in
try await Lambda.runLoop(
runtimeClient: runtimeClient,
handler: handler,
handler: self.handler,
logger: self.logger
)
}
Expand All @@ -152,4 +156,43 @@ public final class LambdaRuntime<Handler>: Sendable where Handler: StreamingLamb
#endif
}
}

internal func startRuntimeInterfaceClient(
endpoint: String,
handler: Handler,
eventLoop: EventLoop,
logger: Logger
) async throws {

let ipAndPort = endpoint.split(separator: ":", maxSplits: 1)
let ip = String(ipAndPort[0])
guard let port = Int(ipAndPort[1]) else { throw LambdaRuntimeError(code: .invalidPort) }

do {
try await LambdaRuntimeClient.withRuntimeClient(
configuration: .init(ip: ip, port: port),
eventLoop: eventLoop,
logger: logger
) { runtimeClient in
try await Lambda.runLoop(
runtimeClient: runtimeClient,
handler: handler,
logger: logger
)
}
} catch {
// catch top level errors that have not been handled until now
// this avoids the runtime to crash and generate a backtrace
if let error = error as? LambdaRuntimeError,
error.code != .connectionToControlPlaneLost
{
// if the error is a LambdaRuntimeError but not a connection error,
// we rethrow it to preserve existing behaviour
logger.error("LambdaRuntime.run() failed with error", metadata: ["error": "\(error)"])
throw error
} else {
logger.trace("LambdaRuntime.run() connection lost")
}
}
}
}
1 change: 0 additions & 1 deletion Sources/AWSLambdaRuntime/LambdaRuntimeError.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ package struct LambdaRuntimeError: Error {

case missingLambdaRuntimeAPIEnvironmentVariable
case runtimeCanOnlyBeStartedOnce
case handlerCanOnlyBeGetOnce
case invalidPort
}

Expand Down
Loading
Loading