@@ -90,6 +90,7 @@ def __init__(
9090 self ._completion_events : dict [str , Event ] = {}
9191 self ._callback_timeouts : dict [str , Future ] = {}
9292 self ._callback_heartbeats : dict [str , Future ] = {}
93+ self ._execution_timeout : Future | None = None
9394
9495 def start_execution (
9596 self ,
@@ -118,6 +119,21 @@ def start_execution(
118119 completion_event = self ._scheduler .create_event ()
119120 self ._completion_events [execution .durable_execution_arn ] = completion_event
120121
122+ # Schedule execution timeout
123+ if input .execution_timeout_seconds > 0 :
124+
125+ def timeout_handler ():
126+ error = ErrorObject .from_message (
127+ f"Execution timed out after { input .execution_timeout_seconds } seconds."
128+ )
129+ self .on_timed_out (execution .durable_execution_arn , error )
130+
131+ self ._execution_timeout = self ._scheduler .call_later (
132+ timeout_handler ,
133+ delay = input .execution_timeout_seconds ,
134+ completion_event = completion_event ,
135+ )
136+
121137 # Schedule initial invocation to run immediately
122138 self ._invoke_execution (execution .durable_execution_arn )
123139
@@ -897,6 +913,9 @@ def _complete_events(self, execution_arn: str):
897913 # complete doesn't actually checkpoint explicitly
898914 if event := self ._completion_events .get (execution_arn ):
899915 event .set ()
916+ if self ._execution_timeout :
917+ self ._execution_timeout .cancel ()
918+ self ._execution_timeout = None
900919
901920 def wait_until_complete (
902921 self , execution_arn : str , timeout : float | None = None
0 commit comments