@@ -90,6 +90,7 @@ def __init__(
9090 self ._completion_events : dict [str , Event ] = {}
9191 self ._callback_timeouts : dict [str , Future ] = {}
9292 self ._callback_heartbeats : dict [str , Future ] = {}
93+ self ._execution_timeout : Future | None = None
9394
9495 def start_execution (
9596 self ,
@@ -118,6 +119,26 @@ def start_execution(
118119 completion_event = self ._scheduler .create_event ()
119120 self ._completion_events [execution .durable_execution_arn ] = completion_event
120121
122+ # Schedule execution timeout
123+ try :
124+ timeout_seconds = input .execution_timeout_seconds
125+ if timeout_seconds and timeout_seconds > 0 :
126+
127+ def timeout_handler ():
128+ error = ErrorObject .from_message (
129+ f"Execution timed out after { timeout_seconds } seconds."
130+ )
131+ self .on_timed_out (execution .durable_execution_arn , error )
132+
133+ self ._execution_timeout = self ._scheduler .call_later (
134+ timeout_handler ,
135+ delay = timeout_seconds ,
136+ completion_event = completion_event ,
137+ )
138+ except (AttributeError , TypeError ):
139+ # Handle Mock objects or invalid timeout values in tests
140+ pass
141+
121142 # Schedule initial invocation to run immediately
122143 self ._invoke_execution (execution .durable_execution_arn )
123144
@@ -897,6 +918,9 @@ def _complete_events(self, execution_arn: str):
897918 # complete doesn't actually checkpoint explicitly
898919 if event := self ._completion_events .get (execution_arn ):
899920 event .set ()
921+ if self ._execution_timeout :
922+ self ._execution_timeout .cancel ()
923+ self ._execution_timeout = None
900924
901925 def wait_until_complete (
902926 self , execution_arn : str , timeout : float | None = None
0 commit comments