Skip to content

Commit 4946d94

Browse files
committed
Refactor and add asyncio.Task to HandlerExecution dataclass
1 parent 8efe5d4 commit 4946d94

File tree

1 file changed

+20
-13
lines changed

1 file changed

+20
-13
lines changed

temporalio/worker/_workflow_instance.py

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -518,9 +518,9 @@ async def run_update() -> None:
518518
f"Update handler for '{job.name}' expected but not found, and there is no dynamic handler. "
519519
f"known updates: [{' '.join(known_updates)}]"
520520
)
521-
self._in_progress_updates[job.id] = HandlerExecution(
522-
job.name, defn.unfinished_policy, job.id
523-
)
521+
self._in_progress_updates[
522+
job.id
523+
].unfinished_policy = defn.unfinished_policy
524524
args = self._process_handler_args(
525525
job.name,
526526
job.input,
@@ -571,7 +571,7 @@ async def run_update() -> None:
571571
# All asyncio cancelled errors become Temporal cancelled errors
572572
if isinstance(err, asyncio.CancelledError):
573573
err = temporalio.exceptions.CancelledError(
574-
f"Cancellation raised within update {err}"
574+
f"Cancellation raised within update: {err}"
575575
)
576576
# Read-only issues during validation should fail the task
577577
if isinstance(err, temporalio.workflow.ReadOnlyContextError):
@@ -606,10 +606,16 @@ async def run_update() -> None:
606606
finally:
607607
self._in_progress_updates.pop(job.id, None)
608608

609-
self.create_task(
609+
task = self.create_task(
610610
run_update(),
611611
name=f"update: {job.name}",
612612
)
613+
self._in_progress_updates[job.id] = HandlerExecution(
614+
job.name,
615+
task,
616+
temporalio.workflow.HandlerUnfinishedPolicy.WARN_AND_ABANDON,
617+
job.id,
618+
)
613619

614620
def _apply_fire_timer(
615621
self, job: temporalio.bridge.proto.workflow_activation.FireTimer
@@ -1729,20 +1735,20 @@ def _process_signal_job(
17291735
signal=job.signal_name, args=args, headers=job.headers
17301736
)
17311737

1738+
task = self.create_task(
1739+
self._run_top_level_workflow_function(self._inbound.handle_signal(input)),
1740+
name=f"signal: {job.signal_name}",
1741+
)
17321742
self._handled_signals_seq += 1
17331743
id = self._handled_signals_seq
1734-
self._in_progress_signals[id] = HandlerExecution(
1735-
job.signal_name, defn.unfinished_policy
1736-
)
17371744

1738-
def done_callback(f):
1745+
def done_callback(_):
17391746
self._in_progress_signals.pop(id, None)
17401747

1741-
task = self.create_task(
1742-
self._run_top_level_workflow_function(self._inbound.handle_signal(input)),
1743-
name=f"signal: {job.signal_name}",
1744-
)
17451748
task.add_done_callback(done_callback)
1749+
self._in_progress_signals[id] = HandlerExecution(
1750+
job.signal_name, task, defn.unfinished_policy
1751+
)
17461752

17471753
def _register_task(
17481754
self,
@@ -2811,6 +2817,7 @@ class HandlerExecution:
28112817
"""Information about an execution of a signal or update handler."""
28122818

28132819
name: str
2820+
task: asyncio.Task[None]
28142821
unfinished_policy: temporalio.workflow.HandlerUnfinishedPolicy
28152822
id: Optional[str] = None
28162823

0 commit comments

Comments
 (0)