Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 102 additions & 12 deletions src/aws_durable_execution_sdk_python_testing/invoker.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
DurableExecutionInvocationInputWithClient,
DurableExecutionInvocationOutput,
InitialExecutionState,
InvocationStatus,
)

from aws_durable_execution_sdk_python_testing.exceptions import (
DurableFunctionsTestError,
)
from aws_durable_execution_sdk_python_testing.model import LambdaContext


if TYPE_CHECKING:
from collections.abc import Callable

Expand Down Expand Up @@ -143,17 +143,107 @@ def invoke(
function_name: str,
input: DurableExecutionInvocationInput,
) -> DurableExecutionInvocationOutput:
# TODO: wrap ResourceNotFoundException from lambda in ResourceNotFoundException from this lib
response = self.lambda_client.invoke(
FunctionName=function_name,
InvocationType="RequestResponse", # Synchronous invocation
Payload=json.dumps(input.to_dict(), default=str),
"""Invoke AWS Lambda function and return durable execution result.

Args:
function_name: Name of the Lambda function to invoke
input: Durable execution invocation input

Returns:
DurableExecutionInvocationOutput: Result of the function execution

Raises:
ResourceNotFoundException: If function does not exist
InvalidParameterValueException: If parameters are invalid
DurableFunctionsTestError: For other invocation failures
"""
from aws_durable_execution_sdk_python_testing.exceptions import (
ResourceNotFoundException,
InvalidParameterValueException,
)

# very simplified placeholder lol
if response["StatusCode"] == 200: # noqa: PLR2004
json_response = json.loads(response["Payload"].read().decode("utf-8"))
return DurableExecutionInvocationOutput.from_dict(json_response)
# Parameter validation
if not function_name or not function_name.strip():
msg = "Function name is required"
raise InvalidParameterValueException(msg)

try:
# Invoke AWS Lambda function using standard invoke method
response = self.lambda_client.invoke(
FunctionName=function_name,
InvocationType="RequestResponse", # Synchronous invocation
Payload=json.dumps(input.to_dict(), default=str),
)

msg: str = f"Lambda invocation failed with status code: {response['StatusCode']}, {response['Payload']=}"
raise DurableFunctionsTestError(msg)
# Check HTTP status code
status_code = response.get("StatusCode")
if status_code not in (200, 202, 204):
msg = f"Lambda invocation failed with status code: {status_code}"
raise DurableFunctionsTestError(msg)

# Check for function errors
if "FunctionError" in response:
error_payload = response["Payload"].read().decode("utf-8")
msg = f"Lambda invocation failed with status {status_code}: {error_payload}"
raise DurableFunctionsTestError(msg)

# Parse response payload
response_payload = response["Payload"].read().decode("utf-8")
response_dict = json.loads(response_payload)

# Convert to DurableExecutionInvocationOutput
return DurableExecutionInvocationOutput.from_dict(response_dict)

except self.lambda_client.exceptions.ResourceNotFoundException as e:
msg = f"Function not found: {function_name}"
raise ResourceNotFoundException(msg) from e
except self.lambda_client.exceptions.InvalidParameterValueException as e:
msg = f"Invalid parameter: {e}"
raise InvalidParameterValueException(msg) from e
except (
self.lambda_client.exceptions.TooManyRequestsException,
self.lambda_client.exceptions.ServiceException,
self.lambda_client.exceptions.ResourceConflictException,
self.lambda_client.exceptions.InvalidRequestContentException,
self.lambda_client.exceptions.RequestTooLargeException,
self.lambda_client.exceptions.UnsupportedMediaTypeException,
self.lambda_client.exceptions.InvalidRuntimeException,
self.lambda_client.exceptions.InvalidZipFileException,
self.lambda_client.exceptions.ResourceNotReadyException,
self.lambda_client.exceptions.SnapStartTimeoutException,
self.lambda_client.exceptions.SnapStartNotReadyException,
self.lambda_client.exceptions.SnapStartException,
self.lambda_client.exceptions.RecursiveInvocationException,
) as e:
msg = f"Lambda invocation failed: {e}"
raise DurableFunctionsTestError(msg) from e
except (
self.lambda_client.exceptions.InvalidSecurityGroupIDException,
self.lambda_client.exceptions.EC2ThrottledException,
self.lambda_client.exceptions.EFSMountConnectivityException,
self.lambda_client.exceptions.SubnetIPAddressLimitReachedException,
self.lambda_client.exceptions.EC2UnexpectedException,
self.lambda_client.exceptions.InvalidSubnetIDException,
self.lambda_client.exceptions.EC2AccessDeniedException,
self.lambda_client.exceptions.EFSIOException,
self.lambda_client.exceptions.ENILimitReachedException,
self.lambda_client.exceptions.EFSMountTimeoutException,
self.lambda_client.exceptions.EFSMountFailureException,
) as e:
msg = f"Lambda infrastructure error: {e}"
raise DurableFunctionsTestError(msg) from e
except (
self.lambda_client.exceptions.KMSAccessDeniedException,
self.lambda_client.exceptions.KMSDisabledException,
self.lambda_client.exceptions.KMSNotFoundException,
self.lambda_client.exceptions.KMSInvalidStateException,
) as e:
msg = f"Lambda KMS error: {e}"
raise DurableFunctionsTestError(msg) from e
except Exception as e:
# Handle any remaining exceptions, including custom ones like DurableExecutionAlreadyStartedException
if "DurableExecutionAlreadyStartedException" in str(type(e)):
msg = f"Durable execution already started: {e}"
raise DurableFunctionsTestError(msg) from e
msg = f"Unexpected error during Lambda invocation: {e}"
raise DurableFunctionsTestError(msg) from e
Loading
Loading