From ac7f9eaade16942c03c52836189e62fdf0233194 Mon Sep 17 00:00:00 2001 From: YedidyaHKfir Date: Tue, 3 Feb 2026 15:15:56 +0200 Subject: [PATCH 01/20] [54-fix-task-failure-handling-when-task-is-not-a-signature] - fail raise mageflwo error --- tests/integration/hatchet/models.py | 4 ++++ tests/integration/hatchet/worker.py | 3 ++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/integration/hatchet/models.py b/tests/integration/hatchet/models.py index 94f75e4..b188bd1 100644 --- a/tests/integration/hatchet/models.py +++ b/tests/integration/hatchet/models.py @@ -35,3 +35,7 @@ class CommandMessageWithResult(ContextMessage): class SleepTaskMessage(ContextMessage): sleep_time: int = 2 result: Any = None + + +class MageflowTestError(Exception): + pass diff --git a/tests/integration/hatchet/worker.py b/tests/integration/hatchet/worker.py index 9cdd05c..d685042 100644 --- a/tests/integration/hatchet/worker.py +++ b/tests/integration/hatchet/worker.py @@ -29,6 +29,7 @@ MessageWithResult, CommandMessageWithResult, SleepTaskMessage, + MageflowTestError, ) settings = Dynaconf( @@ -94,7 +95,7 @@ def chain_callback(msg): @hatchet.task(name="fail_task", input_validator=ContextMessage) def fail_task(msg): - raise ValueError("Test exception") + raise MageflowTestError("Test exception") @hatchet.durable_task(name="sleep_task", input_validator=SleepTaskMessage) From bf8526308cdd61ca602a75933a7640b97025f758 Mon Sep 17 00:00:00 2001 From: YedidyaHKfir Date: Tue, 3 Feb 2026 15:16:27 +0200 Subject: [PATCH 02/20] [54-fix-task-failure-handling-when-task-is-not-a-signature] - added test to check that normal fail task raises the correct error message --- .../hatchet/signature/test_edge_case.py | 32 ++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/tests/integration/hatchet/signature/test_edge_case.py b/tests/integration/hatchet/signature/test_edge_case.py index 6a01519..2814220 100644 --- a/tests/integration/hatchet/signature/test_edge_case.py +++ b/tests/integration/hatchet/signature/test_edge_case.py @@ -2,6 +2,7 @@ from datetime import datetime import pytest +from hatchet_sdk.clients.rest import V1TaskStatus import mageflow from tests.integration.hatchet.assertions import ( @@ -12,7 +13,7 @@ map_wf_by_id, ) from tests.integration.hatchet.conftest import HatchetInitData -from tests.integration.hatchet.models import ContextMessage +from tests.integration.hatchet.models import ContextMessage, MageflowTestError from tests.integration.hatchet.worker import ( timeout_task, error_callback, @@ -20,6 +21,7 @@ retry_to_failure, task1_callback, cancel_retry, + fail_task, ) @@ -138,3 +140,31 @@ async def test__retry_but_override_with_exception__check_error_callback_is_calle failed_summary = assert_signature_failed(runs, cancel_retry_sign) assert failed_summary.retry_count == 0 assert_signature_done(runs, error_callback_sign, base_data=test_ctx) + + +@pytest.mark.asyncio(loop_scope="session") +async def test_check_normal_task_fails__sanity( + hatchet_client_init: HatchetInitData, test_ctx, ctx_metadata, trigger_options +): + # Arrange + redis_client, hatchet = ( + hatchet_client_init.redis_client, + hatchet_client_init.hatchet, + ) + + # Act + message = ContextMessage(base_data=test_ctx) + await fail_task.aio_run_no_wait(message, options=trigger_options) + await asyncio.sleep(3) + + # Assert + runs = await get_runs(hatchet, ctx_metadata) + + assert len(runs) == 1 + failed_task_summary = runs[0] + assert failed_task_summary.status == V1TaskStatus.FAILED + error_class_name = MageflowTestError.__name__ + err_msg = failed_task_summary.error_message + assert err_msg.startswith( + error_class_name + ), f"{err_msg} doesn't start with {error_class_name}" From 2d7a93ee6da76f2c3de626b1055fc887917f7d6a Mon Sep 17 00:00:00 2001 From: YedidyaHKfir Date: Tue, 3 Feb 2026 15:27:44 +0200 Subject: [PATCH 03/20] [54-fix-task-failure-handling-when-task-is-not-a-signature] - check if task is normal task, if true raise error noramlly --- mageflow/callbacks.py | 3 +++ mageflow/invokers/hatchet.py | 7 +++++++ 2 files changed, 10 insertions(+) diff --git a/mageflow/callbacks.py b/mageflow/callbacks.py index 1812f03..f363f33 100644 --- a/mageflow/callbacks.py +++ b/mageflow/callbacks.py @@ -40,6 +40,7 @@ async def wrapper(message: EmptyModel, ctx: Context, *args, **kwargs): # NOTE: This should not run, the task should cancel, but just in case return {"Error": "Task should have been canceled"} try: + is_normal_run = invoker.is_vanilla_run() signature = await invoker.start_task() if send_signature: kwargs["signature"] = signature @@ -50,6 +51,8 @@ async def wrapper(message: EmptyModel, ctx: Context, *args, **kwargs): else: result = await flexible_call(func, message, ctx, *args, **kwargs) except (Exception, asyncio.CancelledError) as e: + if is_normal_run: + raise if not task_model.should_retry(ctx.attempt_number, e): await signature.failed() await invoker.run_error() diff --git a/mageflow/invokers/hatchet.py b/mageflow/invokers/hatchet.py index bb2eae5..f7be9cf 100644 --- a/mageflow/invokers/hatchet.py +++ b/mageflow/invokers/hatchet.py @@ -28,6 +28,13 @@ def __init__(self, message: BaseModel, ctx: Context): def task_ctx(self) -> dict: return self.task_data + @property + def task_id(self) -> str | None: + return self.task_data.get(TASK_ID_PARAM_NAME, None) + + def is_vanilla_run(self): + return self.task_id + async def start_task(self) -> TaskSignature | None: task_id = self.task_data.get(TASK_ID_PARAM_NAME, None) if task_id: From fc31c24c9220ab58a969c40d7b150516d3003cc6 Mon Sep 17 00:00:00 2001 From: YedidyaHKfir Date: Tue, 3 Feb 2026 15:30:27 +0200 Subject: [PATCH 04/20] [54-fix-task-failure-handling-when-task-is-not-a-signature] - fix check --- mageflow/invokers/hatchet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mageflow/invokers/hatchet.py b/mageflow/invokers/hatchet.py index f7be9cf..8223ca4 100644 --- a/mageflow/invokers/hatchet.py +++ b/mageflow/invokers/hatchet.py @@ -33,7 +33,7 @@ def task_id(self) -> str | None: return self.task_data.get(TASK_ID_PARAM_NAME, None) def is_vanilla_run(self): - return self.task_id + return self.task_id is None async def start_task(self) -> TaskSignature | None: task_id = self.task_data.get(TASK_ID_PARAM_NAME, None) From cc8f0b933b367fed25fc67d26ab5db2e6a4049c1 Mon Sep 17 00:00:00 2001 From: YedidyaHKfir Date: Tue, 3 Feb 2026 15:31:17 +0200 Subject: [PATCH 05/20] [54-fix-task-failure-handling-when-task-is-not-a-signature] - normal run return message as is --- mageflow/callbacks.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/mageflow/callbacks.py b/mageflow/callbacks.py index f363f33..46e05cb 100644 --- a/mageflow/callbacks.py +++ b/mageflow/callbacks.py @@ -59,6 +59,8 @@ async def wrapper(message: EmptyModel, ctx: Context, *args, **kwargs): await invoker.remove_task(with_error=False) raise else: + if is_normal_run: + return result task_results = HatchetResult(hatchet_results=result) dumped_results = task_results.model_dump(mode="json") await invoker.run_success(dumped_results["hatchet_results"]) From a76a6cdaba9d6a4f031aaba52d9dc9badc61d4d3 Mon Sep 17 00:00:00 2001 From: YedidyaHKfir Date: Tue, 3 Feb 2026 15:39:43 +0200 Subject: [PATCH 06/20] [54-fix-task-failure-handling-when-task-is-not-a-signature] - fix test, we check the normal output is returned --- tests/integration/hatchet/assertions.py | 6 ------ tests/integration/hatchet/signature/test__signature.py | 9 ++++++--- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/tests/integration/hatchet/assertions.py b/tests/integration/hatchet/assertions.py index fe78c55..f3cf79d 100644 --- a/tests/integration/hatchet/assertions.py +++ b/tests/integration/hatchet/assertions.py @@ -94,12 +94,6 @@ def get_task_param(wf: V1TaskSummary, param_name: str): return wf.additional_metadata.get(TASK_DATA_PARAM_NAME, {}).get(param_name) -def assert_task_done(runs: HatchetRuns, task, input_params=None, results=None): - __tracebackhide__ = False # force pytest to show this frame - workflows_by_name = {wf.workflow_name: wf for wf in runs} - return _assert_task_done(task.name, workflows_by_name, input_params, results) - - def assert_signature_done( runs: HatchetRuns, task_sign: TaskSignature | TaskIdentifierType, diff --git a/tests/integration/hatchet/signature/test__signature.py b/tests/integration/hatchet/signature/test__signature.py index de92612..440ac9b 100644 --- a/tests/integration/hatchet/signature/test__signature.py +++ b/tests/integration/hatchet/signature/test__signature.py @@ -1,12 +1,12 @@ import asyncio import pytest +from hatchet_sdk.clients.rest import V1TaskStatus import mageflow from mageflow.signature.model import TaskSignature from mageflow.utils.models import return_value_field from tests.integration.hatchet.assertions import ( - assert_task_done, assert_redis_is_clean, assert_signature_done, get_runs, @@ -221,10 +221,13 @@ async def test__call_signed_task_with_normal_workflow__check_task_is_done( await task1_callback.aio_run_no_wait(message, options=trigger_options) # Assert - await asyncio.sleep(10) + await asyncio.sleep(3) runs = await get_runs(hatchet, ctx_metadata) - assert_task_done(runs, task1_callback, results=message.model_dump()) + assert len(runs) == 1 + task_summary = runs[0] + assert task_summary.status == V1TaskStatus.COMPLETED + assert task_summary.output == message.model_dump() @pytest.mark.asyncio(loop_scope="session") From 67c5ecc254759ca8365387dd3d95381ad861d1e3 Mon Sep 17 00:00:00 2001 From: YedidyaHKfir Date: Tue, 3 Feb 2026 15:39:51 +0200 Subject: [PATCH 07/20] [54-fix-task-failure-handling-when-task-is-not-a-signature] - shorter time --- tests/integration/hatchet/signature/test__signature.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/hatchet/signature/test__signature.py b/tests/integration/hatchet/signature/test__signature.py index 440ac9b..418f653 100644 --- a/tests/integration/hatchet/signature/test__signature.py +++ b/tests/integration/hatchet/signature/test__signature.py @@ -202,7 +202,7 @@ async def test_task_with_failure_callback_execution_and_redis_cleanup_sanity( await task.aio_run_no_wait(message, options=trigger_options) # Assert - await asyncio.sleep(10) + await asyncio.sleep(5) runs = await get_runs(hatchet, ctx_metadata) assert_signature_done(runs, task, base_data=test_ctx, allow_fails=True) assert_signature_done(runs, error_callback_signature, base_data=test_ctx) @@ -249,7 +249,7 @@ async def test__call_task_that_return_multiple_values_of_basemodel__sanity( await return_multiple_values_sign.aio_run_no_wait(message, options=trigger_options) # Assert - await asyncio.sleep(10) + await asyncio.sleep(5) runs = await get_runs(hatchet, ctx_metadata) assert_signature_done(runs, return_multiple_values_sign, **message.model_dump()) assert_signature_done( From 6abf580f6992df5c2660c9ef32fe8a236a269255 Mon Sep 17 00:00:00 2001 From: YedidyaHKfir Date: Tue, 3 Feb 2026 15:40:13 +0200 Subject: [PATCH 08/20] [54-fix-task-failure-handling-when-task-is-not-a-signature] - use task_id property --- mageflow/invokers/hatchet.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/mageflow/invokers/hatchet.py b/mageflow/invokers/hatchet.py index 8223ca4..e965e16 100644 --- a/mageflow/invokers/hatchet.py +++ b/mageflow/invokers/hatchet.py @@ -36,7 +36,7 @@ def is_vanilla_run(self): return self.task_id is None async def start_task(self) -> TaskSignature | None: - task_id = self.task_data.get(TASK_ID_PARAM_NAME, None) + task_id = self.task_id if task_id: async with TaskSignature.alock_from_key(task_id) as signature: await signature.change_status(SignatureStatus.ACTIVE) @@ -46,7 +46,7 @@ async def start_task(self) -> TaskSignature | None: async def run_success(self, result: Any) -> bool: success_publish_tasks = [] - task_id = self.task_data.get(TASK_ID_PARAM_NAME, None) + task_id = self.task_id if task_id: current_task = await TaskSignature.get_safe(task_id) task_success_workflows = current_task.activate_success(result) @@ -60,7 +60,7 @@ async def run_success(self, result: Any) -> bool: async def run_error(self) -> bool: error_publish_tasks = [] - task_id = self.task_data.get(TASK_ID_PARAM_NAME, None) + task_id = self.task_id if task_id: current_task = await TaskSignature.get_safe(task_id) task_error_workflows = current_task.activate_error(self.message) @@ -74,7 +74,7 @@ async def run_error(self) -> bool: async def remove_task( self, with_success: bool = True, with_error: bool = True ) -> TaskSignature | None: - task_id = self.task_data.get(TASK_ID_PARAM_NAME, None) + task_id = self.task_id if task_id: signature = await TaskSignature.get_safe(task_id) if signature: @@ -82,7 +82,7 @@ async def remove_task( return None async def should_run_task(self) -> bool: - task_id = self.task_data.get(TASK_ID_PARAM_NAME, None) + task_id = self.task_id if task_id: signature = await TaskSignature.get_safe(task_id) if signature is None: From 21306f89294335a9156b4cd79330cb8c8b17586a Mon Sep 17 00:00:00 2001 From: YedidyaHKfir Date: Tue, 3 Feb 2026 15:41:40 +0200 Subject: [PATCH 09/20] [54-fix-task-failure-handling-when-task-is-not-a-signature] - we created a task_fail in invoker, merge remove --- mageflow/callbacks.py | 3 +-- mageflow/invokers/base.py | 2 +- mageflow/invokers/hatchet.py | 10 +++++----- .../hatchet/signature/test_edge_case.py | 16 ++++++++++++++++ 4 files changed, 23 insertions(+), 8 deletions(-) diff --git a/mageflow/callbacks.py b/mageflow/callbacks.py index 46e05cb..0e3b4bd 100644 --- a/mageflow/callbacks.py +++ b/mageflow/callbacks.py @@ -55,8 +55,7 @@ async def wrapper(message: EmptyModel, ctx: Context, *args, **kwargs): raise if not task_model.should_retry(ctx.attempt_number, e): await signature.failed() - await invoker.run_error() - await invoker.remove_task(with_error=False) + await invoker.task_failed() raise else: if is_normal_run: diff --git a/mageflow/invokers/base.py b/mageflow/invokers/base.py index 94dff09..7ffeb71 100644 --- a/mageflow/invokers/base.py +++ b/mageflow/invokers/base.py @@ -22,7 +22,7 @@ async def run_success(self, result: Any) -> bool: pass @abc.abstractmethod - async def run_error(self) -> bool: + async def task_failed(self) -> bool: pass @abc.abstractmethod diff --git a/mageflow/invokers/hatchet.py b/mageflow/invokers/hatchet.py index e965e16..e1b68b2 100644 --- a/mageflow/invokers/hatchet.py +++ b/mageflow/invokers/hatchet.py @@ -58,7 +58,7 @@ async def run_success(self, result: Any) -> bool: return True return False - async def run_error(self) -> bool: + async def task_failed(self): error_publish_tasks = [] task_id = self.task_id if task_id: @@ -66,10 +66,10 @@ async def run_error(self) -> bool: task_error_workflows = current_task.activate_error(self.message) error_publish_tasks.append(asyncio.create_task(task_error_workflows)) - if error_publish_tasks: - await asyncio.gather(*error_publish_tasks) - return True - return False + if error_publish_tasks: + await asyncio.gather(*error_publish_tasks) + + await current_task.remove(with_error=False) async def remove_task( self, with_success: bool = True, with_error: bool = True diff --git a/tests/integration/hatchet/signature/test_edge_case.py b/tests/integration/hatchet/signature/test_edge_case.py index 2814220..712eca8 100644 --- a/tests/integration/hatchet/signature/test_edge_case.py +++ b/tests/integration/hatchet/signature/test_edge_case.py @@ -168,3 +168,19 @@ async def test_check_normal_task_fails__sanity( assert err_msg.startswith( error_class_name ), f"{err_msg} doesn't start with {error_class_name}" + + +@pytest.mark.asyncio(loop_scope="session") +async def test_retry_normal_tasks__sanity( + hatchet_client_init: HatchetInitData, test_ctx, ctx_metadata, trigger_options +): + # Arrange + redis_client, hatchet = ( + hatchet_client_init.redis_client, + hatchet_client_init.hatchet, + ) + + # Act + message = ContextMessage(base_data=test_ctx) + await retry_once.aio_run_no_wait(message, options=trigger_options) + await asyncio.sleep(10) From f0c9d452f99e305f07aa2c6b302cd9fcd890a282 Mon Sep 17 00:00:00 2001 From: YedidyaHKfir Date: Tue, 3 Feb 2026 15:43:01 +0200 Subject: [PATCH 10/20] [54-fix-task-failure-handling-when-task-is-not-a-signature] - we set as fail after publish error --- mageflow/callbacks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mageflow/callbacks.py b/mageflow/callbacks.py index 0e3b4bd..a3ecf1a 100644 --- a/mageflow/callbacks.py +++ b/mageflow/callbacks.py @@ -54,8 +54,8 @@ async def wrapper(message: EmptyModel, ctx: Context, *args, **kwargs): if is_normal_run: raise if not task_model.should_retry(ctx.attempt_number, e): - await signature.failed() await invoker.task_failed() + await signature.failed() raise else: if is_normal_run: From cb7bf4acc93196f9bb3ac52235da97385df97f57 Mon Sep 17 00:00:00 2001 From: YedidyaHKfir Date: Tue, 3 Feb 2026 16:12:37 +0200 Subject: [PATCH 11/20] [54-fix-task-failure-handling-when-task-is-not-a-signature] - done and fail is called after callbacks. rename to task success --- mageflow/callbacks.py | 4 ++-- mageflow/invokers/base.py | 2 +- mageflow/invokers/hatchet.py | 11 +++++------ 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/mageflow/callbacks.py b/mageflow/callbacks.py index a3ecf1a..5e699d6 100644 --- a/mageflow/callbacks.py +++ b/mageflow/callbacks.py @@ -62,8 +62,8 @@ async def wrapper(message: EmptyModel, ctx: Context, *args, **kwargs): return result task_results = HatchetResult(hatchet_results=result) dumped_results = task_results.model_dump(mode="json") - await invoker.run_success(dumped_results["hatchet_results"]) - await invoker.remove_task(with_success=False) + await invoker.task_success(dumped_results["hatchet_results"]) + await current_task.done() if wrap_res: return task_results else: diff --git a/mageflow/invokers/base.py b/mageflow/invokers/base.py index 7ffeb71..f160d29 100644 --- a/mageflow/invokers/base.py +++ b/mageflow/invokers/base.py @@ -18,7 +18,7 @@ async def start_task(self): pass @abc.abstractmethod - async def run_success(self, result: Any) -> bool: + async def task_success(self, result: Any) -> bool: pass @abc.abstractmethod diff --git a/mageflow/invokers/hatchet.py b/mageflow/invokers/hatchet.py index e1b68b2..0ff7452 100644 --- a/mageflow/invokers/hatchet.py +++ b/mageflow/invokers/hatchet.py @@ -44,19 +44,18 @@ async def start_task(self) -> TaskSignature | None: return signature return None - async def run_success(self, result: Any) -> bool: + async def task_success(self, result: Any): success_publish_tasks = [] task_id = self.task_id if task_id: current_task = await TaskSignature.get_safe(task_id) task_success_workflows = current_task.activate_success(result) - await current_task.done() success_publish_tasks.append(asyncio.create_task(task_success_workflows)) - if success_publish_tasks: - await asyncio.gather(*success_publish_tasks) - return True - return False + if success_publish_tasks: + await asyncio.gather(*success_publish_tasks) + + await current_task.remove(with_success=False) async def task_failed(self): error_publish_tasks = [] From 77d8301c5a9e5322ebf847a5a6224272b850d0b9 Mon Sep 17 00:00:00 2001 From: YedidyaHKfir Date: Tue, 3 Feb 2026 16:13:58 +0200 Subject: [PATCH 12/20] [54-fix-task-failure-handling-when-task-is-not-a-signature] - unused task --- mageflow/invokers/hatchet.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/mageflow/invokers/hatchet.py b/mageflow/invokers/hatchet.py index 0ff7452..dfa5906 100644 --- a/mageflow/invokers/hatchet.py +++ b/mageflow/invokers/hatchet.py @@ -70,16 +70,6 @@ async def task_failed(self): await current_task.remove(with_error=False) - async def remove_task( - self, with_success: bool = True, with_error: bool = True - ) -> TaskSignature | None: - task_id = self.task_id - if task_id: - signature = await TaskSignature.get_safe(task_id) - if signature: - await signature.remove(with_error, with_success) - return None - async def should_run_task(self) -> bool: task_id = self.task_id if task_id: From 81b45b02d50a072132137d53bbc077cc2e918be0 Mon Sep 17 00:00:00 2001 From: YedidyaHKfir Date: Tue, 3 Feb 2026 16:17:57 +0200 Subject: [PATCH 13/20] [54-fix-task-failure-handling-when-task-is-not-a-signature] - remove task is not needed --- mageflow/invokers/base.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/mageflow/invokers/base.py b/mageflow/invokers/base.py index f160d29..3b53545 100644 --- a/mageflow/invokers/base.py +++ b/mageflow/invokers/base.py @@ -25,12 +25,6 @@ async def task_success(self, result: Any) -> bool: async def task_failed(self) -> bool: pass - @abc.abstractmethod - async def remove_task( - self, with_success: bool = True, with_error: bool = True - ) -> TaskSignature | None: - pass - @abc.abstractmethod async def should_run_task(self) -> bool: pass From 9704833a1714a2aee198ca135a832a183aa6b9c8 Mon Sep 17 00:00:00 2001 From: YedidyaHKfir Date: Tue, 3 Feb 2026 16:20:56 +0200 Subject: [PATCH 14/20] [54-fix-task-failure-handling-when-task-is-not-a-signature] - fix signature name --- mageflow/callbacks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mageflow/callbacks.py b/mageflow/callbacks.py index 5e699d6..6a8669a 100644 --- a/mageflow/callbacks.py +++ b/mageflow/callbacks.py @@ -63,7 +63,7 @@ async def wrapper(message: EmptyModel, ctx: Context, *args, **kwargs): task_results = HatchetResult(hatchet_results=result) dumped_results = task_results.model_dump(mode="json") await invoker.task_success(dumped_results["hatchet_results"]) - await current_task.done() + await signature.done() if wrap_res: return task_results else: From 32ed5c482c430f8328446bf6bc424c463f9e024a Mon Sep 17 00:00:00 2001 From: YedidyaHKfir Date: Tue, 3 Feb 2026 16:22:28 +0200 Subject: [PATCH 15/20] [54-fix-task-failure-handling-when-task-is-not-a-signature] - test can be shorter --- tests/integration/hatchet/signature/test_edge_case.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/hatchet/signature/test_edge_case.py b/tests/integration/hatchet/signature/test_edge_case.py index 712eca8..2309c41 100644 --- a/tests/integration/hatchet/signature/test_edge_case.py +++ b/tests/integration/hatchet/signature/test_edge_case.py @@ -183,4 +183,4 @@ async def test_retry_normal_tasks__sanity( # Act message = ContextMessage(base_data=test_ctx) await retry_once.aio_run_no_wait(message, options=trigger_options) - await asyncio.sleep(10) + await asyncio.sleep(5) From 31a8401e484a73302486c19201a6aec0797a4c5e Mon Sep 17 00:00:00 2001 From: YedidyaHKfir Date: Tue, 3 Feb 2026 17:17:26 +0200 Subject: [PATCH 16/20] [54-fix-task-failure-handling-when-task-is-not-a-signature] - we dont need to update signature models --- mageflow/signature/model.py | 1 - mageflow/startup.py | 15 --------------- tests/unit/conftest.py | 7 ++++--- 3 files changed, 4 insertions(+), 19 deletions(-) diff --git a/mageflow/signature/model.py b/mageflow/signature/model.py index fb7afdd..9f7f1c5 100644 --- a/mageflow/signature/model.py +++ b/mageflow/signature/model.py @@ -320,5 +320,4 @@ async def lock_from_key( await redis_model.asave() -SIGNATURES_NAME_MAPPING: dict[str, type[TaskSignature]] = {} TaskInputType: TypeAlias = TaskIdentifierType | TaskSignature diff --git a/mageflow/startup.py b/mageflow/startup.py index 94281dd..40003bf 100644 --- a/mageflow/startup.py +++ b/mageflow/startup.py @@ -25,27 +25,12 @@ class MageFlowConfigModel(ConfigModel): async def init_mageflow(): await rapyer.init_rapyer(mageflow_config.redis_client, prefer_normal_json_dump=True) await register_workflows() - await update_register_signature_models() async def teardown_mageflow(): await rapyer.teardown_rapyer() -async def update_register_signature_models(): - from mageflow.signature.model import SIGNATURES_NAME_MAPPING, TaskSignature - - signature_classes = [ - cls for cls in rapyer.find_redis_models() if issubclass(cls, TaskSignature) - ] - SIGNATURES_NAME_MAPPING.update( - { - signature_class.__name__: signature_class - for signature_class in signature_classes - } - ) - - async def register_workflows(): for reg_task in REGISTERED_TASKS: workflow, mageflow_task_name = reg_task diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 838d8b8..10d9b3e 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -11,7 +11,7 @@ from mageflow.invokers.hatchet import HatchetInvoker from mageflow.signature.consts import TASK_ID_PARAM_NAME from mageflow.signature.model import TaskSignature -from mageflow.startup import update_register_signature_models, mageflow_config +from mageflow.startup import mageflow_config from mageflow.swarm.messages import SwarmResultsMessage from mageflow.swarm.model import SwarmTaskSignature, BatchItemTaskSignature, SwarmConfig from mageflow.workflows import MageflowWorkflow @@ -44,7 +44,6 @@ class SwarmItemDoneSetup: @pytest_asyncio.fixture(autouse=True, scope="function") async def redis_client(): - await update_register_signature_models() client = fakeredis.aioredis.FakeRedis() mageflow_config.redis_client = client await client.flushall() @@ -292,7 +291,9 @@ def mock_workflow_run_with_args(): captured_calls = [] async def capture_and_mock(self, *args, **kwargs): - captured_calls.append(WorkflowCallCapture(workflow=self, args=args, kwargs=kwargs)) + captured_calls.append( + WorkflowCallCapture(workflow=self, args=args, kwargs=kwargs) + ) with patch.object(MageflowWorkflow, "aio_run_no_wait", capture_and_mock): yield captured_calls From b6e2a479d2b29929680e1a62633f93434db46442 Mon Sep 17 00:00:00 2001 From: YedidyaHKfir Date: Tue, 3 Feb 2026 17:27:21 +0200 Subject: [PATCH 17/20] [54-fix-task-failure-handling-when-task-is-not-a-signature] - optimize --- mageflow/invokers/base.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/mageflow/invokers/base.py b/mageflow/invokers/base.py index 3b53545..a853460 100644 --- a/mageflow/invokers/base.py +++ b/mageflow/invokers/base.py @@ -4,8 +4,6 @@ from pydantic import BaseModel -from mageflow.signature.model import TaskSignature - class BaseInvoker(ABC): @property From 93fdab4eee4444629436b337ce4e118e3078a55b Mon Sep 17 00:00:00 2001 From: YedidyaHKfir Date: Tue, 3 Feb 2026 17:36:34 +0200 Subject: [PATCH 18/20] [54-fix-task-failure-handling-when-task-is-not-a-signature] - added tests for normal tasks --- .../hatchet/signature/test_edge_case.py | 68 ++++++++++++++++--- tests/integration/hatchet/worker.py | 9 +++ 2 files changed, 67 insertions(+), 10 deletions(-) diff --git a/tests/integration/hatchet/signature/test_edge_case.py b/tests/integration/hatchet/signature/test_edge_case.py index 2309c41..7aa54c9 100644 --- a/tests/integration/hatchet/signature/test_edge_case.py +++ b/tests/integration/hatchet/signature/test_edge_case.py @@ -1,6 +1,6 @@ import asyncio from datetime import datetime - +from hatchet_sdk import NonRetryableException import pytest from hatchet_sdk.clients.rest import V1TaskStatus @@ -22,6 +22,7 @@ task1_callback, cancel_retry, fail_task, + normal_retry_once, ) @@ -147,10 +148,7 @@ async def test_check_normal_task_fails__sanity( hatchet_client_init: HatchetInitData, test_ctx, ctx_metadata, trigger_options ): # Arrange - redis_client, hatchet = ( - hatchet_client_init.redis_client, - hatchet_client_init.hatchet, - ) + hatchet = hatchet_client_init.hatchet # Act message = ContextMessage(base_data=test_ctx) @@ -175,12 +173,62 @@ async def test_retry_normal_tasks__sanity( hatchet_client_init: HatchetInitData, test_ctx, ctx_metadata, trigger_options ): # Arrange - redis_client, hatchet = ( - hatchet_client_init.redis_client, - hatchet_client_init.hatchet, - ) + hatchet = hatchet_client_init.hatchet + + # Act + message = ContextMessage(base_data=test_ctx) + await normal_retry_once.aio_run_no_wait(message, options=trigger_options) + await asyncio.sleep(5) + + # Assert + runs = await get_runs(hatchet, ctx_metadata) + + assert len(runs) == 1 + retry_task_summary = runs[0] + assert retry_task_summary.status == V1TaskStatus.COMPLETED + assert retry_task_summary.retry_count == 1 + assert retry_task_summary.output == message.model_dump(mode="json") + + +@pytest.mark.asyncio(loop_scope="session") +async def test_cancel_task_output__sanity( + hatchet_client_init: HatchetInitData, test_ctx, ctx_metadata, trigger_options +): + # Arrange + hatchet = hatchet_client_init.hatchet + + # Act + message = ContextMessage(base_data=test_ctx) + await cancel_retry.aio_run_no_wait(message, options=trigger_options) + await asyncio.sleep(5) + + # Assert + runs = await get_runs(hatchet, ctx_metadata) + + assert len(runs) == 1 + cancel_task_summary = runs[0] + assert cancel_task_summary.status == V1TaskStatus.FAILED + assert cancel_task_summary.retry_count == 0 + assert NonRetryableException.__name__ in cancel_task_summary.error_message + + +@pytest.mark.asyncio(loop_scope="session") +async def test_timeout_task_output__sanity( + hatchet_client_init: HatchetInitData, test_ctx, ctx_metadata, trigger_options +): + # Arrange + hatchet = hatchet_client_init.hatchet # Act message = ContextMessage(base_data=test_ctx) - await retry_once.aio_run_no_wait(message, options=trigger_options) + await timeout_task.aio_run_no_wait(message, options=trigger_options) await asyncio.sleep(5) + + # Assert + runs = await get_runs(hatchet, ctx_metadata) + + assert len(runs) == 1 + timeout_task_summary = runs[0] + assert timeout_task_summary.status == V1TaskStatus.FAILED + # No error message + assert not timeout_task_summary.error_message diff --git a/tests/integration/hatchet/worker.py b/tests/integration/hatchet/worker.py index d685042..2b3b10d 100644 --- a/tests/integration/hatchet/worker.py +++ b/tests/integration/hatchet/worker.py @@ -133,6 +133,14 @@ async def retry_once(msg, ctx: Context): return "Nice" +@hatchet.task(retries=3, execution_timeout=60) +@hatchet.with_ctx +async def normal_retry_once(msg, ctx: Context): + if ctx.attempt_number == 1: + raise ValueError("Test exception") + return msg + + @hatchet.task(retries=3, execution_timeout=60) @hatchet.with_signature async def retry_to_failure(msg, signature: TaskSignature): @@ -162,6 +170,7 @@ async def cancel_retry(msg): return_multiple_values, timeout_task, retry_once, + normal_retry_once, retry_to_failure, cancel_retry, ] From 489471d85783e2c55af7150a79e8ae6af23476e0 Mon Sep 17 00:00:00 2001 From: YedidyaHKfir Date: Tue, 3 Feb 2026 23:21:55 +0200 Subject: [PATCH 19/20] [54-fix-task-failure-handling-when-task-is-not-a-signature] - no more return types for invoker --- mageflow/invokers/base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mageflow/invokers/base.py b/mageflow/invokers/base.py index a853460..d069e43 100644 --- a/mageflow/invokers/base.py +++ b/mageflow/invokers/base.py @@ -16,11 +16,11 @@ async def start_task(self): pass @abc.abstractmethod - async def task_success(self, result: Any) -> bool: + async def task_success(self, result: Any): pass @abc.abstractmethod - async def task_failed(self) -> bool: + async def task_failed(self): pass @abc.abstractmethod From 37e23566c3867b7d8b676103af39a9e9a97a4226 Mon Sep 17 00:00:00 2001 From: YedidyaHKfir Date: Tue, 3 Feb 2026 23:59:12 +0200 Subject: [PATCH 20/20] [54-fix-task-failure-handling-when-task-is-not-a-signature] - added security scans --- .github/workflows/bandit.yml | 46 ++++++++++++++++++ .github/workflows/codeql.yml | 10 ++-- .github/workflows/pm.yml | 51 +++++++++++++++++++ .github/workflows/security.yml | 89 ++++++++++++++++++++++++++++++++++ .github/workflows/semgrep.yml | 53 ++++++++++++++++++++ 5 files changed, 246 insertions(+), 3 deletions(-) create mode 100644 .github/workflows/bandit.yml create mode 100644 .github/workflows/pm.yml create mode 100644 .github/workflows/security.yml create mode 100644 .github/workflows/semgrep.yml diff --git a/.github/workflows/bandit.yml b/.github/workflows/bandit.yml new file mode 100644 index 0000000..af45d46 --- /dev/null +++ b/.github/workflows/bandit.yml @@ -0,0 +1,46 @@ +# This workflow uses actions that are not certified by GitHub. +# They are provided by a third-party and are governed by +# separate terms of service, privacy policy, and support +# documentation. + +# Bandit is a security linter designed to find common security issues in Python code. +# This action will run Bandit on your codebase. +# The results of the scan will be found under the Security tab of your repository. + +# https://github.com/marketplace/actions/bandit-scan is ISC licensed, by abirismyname +# https://pypi.org/project/bandit/ is Apache v2.0 licensed, by PyCQA + +name: Bandit +on: + push: + branches: [ "main", "develop" ] + pull_request: + # The branches below must be a subset of the branches above + branches: [ "main" ] + schedule: + - cron: '31 8 * * 5' + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + bandit: + permissions: + contents: read # for actions/checkout to fetch code + security-events: write # for github/codeql-action/upload-sarif to upload SARIF results + actions: read # only required for a private repository by github/codeql-action/upload-sarif to get the Action run status + + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Bandit Scan + uses: shundor/python-bandit-scan@ab1d87dfccc5a0ffab88be3aaac6ffe35c10d6cd + with: + exit_zero: true + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + # Exclude test directories - B101 (assert_used) is valid in tests + excluded_paths: tests + # Skip B101 globally as asserts are valid for runtime checks too + skips: B101 + diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml index a07e1fd..c949d6a 100644 --- a/.github/workflows/codeql.yml +++ b/.github/workflows/codeql.yml @@ -13,11 +13,15 @@ name: "CodeQL Advanced" on: push: - branches: [ "main" ] + branches: [ "main", "develop" ] pull_request: - branches: [ "main" ] + branches: [ "main", "develop" ] schedule: - - cron: '24 17 * * 1' + - cron: '45 2 * * 0' + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true jobs: analyze: diff --git a/.github/workflows/pm.yml b/.github/workflows/pm.yml new file mode 100644 index 0000000..84ebb53 --- /dev/null +++ b/.github/workflows/pm.yml @@ -0,0 +1,51 @@ +name: Close issues related to a merged pull request based on master branch. + +on: + pull_request: + types: [closed] + branches: + - develop + - main + +permissions: + issues: write + +jobs: + closeIssueOnPrMergeTrigger: + if: github.event.pull_request.merged == true + runs-on: ubuntu-latest + + steps: + - name: Closes issues related to a merged pull request. + uses: ldez/gha-mjolnir@v1.4.1 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + + - name: Extract issue number from branch name and close + uses: actions/github-script@v7 + with: + script: | + const branchName = context.payload.pull_request.head.ref; + // Match patterns: feature/86-..., fix/123-..., 86-..., issue-86 + const match = branchName.match(/(?:^|\/|-)(\d+)(?:-|$)/); + if (match) { + const issueNumber = parseInt(match[1]); + try { + const issue = await github.rest.issues.get({ + owner: context.repo.owner, + repo: context.repo.repo, + issue_number: issueNumber + }); + if (issue.data.state === 'open') { + await github.rest.issues.update({ + owner: context.repo.owner, + repo: context.repo.repo, + issue_number: issueNumber, + state: 'closed', + state_reason: 'completed' + }); + } + } catch (error) { + if (error.status !== 404) throw error; + } + } diff --git a/.github/workflows/security.yml b/.github/workflows/security.yml new file mode 100644 index 0000000..2b95184 --- /dev/null +++ b/.github/workflows/security.yml @@ -0,0 +1,89 @@ +name: Security Scan + +on: + push: + branches: [main, develop] + pull_request: + branches: [main, develop] + schedule: + # Run weekly on Monday at 9am UTC + - cron: '0 9 * * 1' + workflow_dispatch: + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +permissions: + contents: read + security-events: write + +jobs: + pip-audit: + name: Dependency Vulnerability Scan + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.11' + + - name: Install tools + run: | + python -m pip install --upgrade pip pipx + pip install pip-audit + pipx install poetry + pipx inject poetry poetry-plugin-export + + - name: Export and audit dependencies + run: | + poetry export -f requirements.txt --without-hashes -o requirements-audit.txt + pip-audit --strict -r requirements-audit.txt + + + secrets-scan: + name: Secrets Detection + environment: + name: security + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Run Gitleaks + uses: gitleaks/gitleaks-action@v2 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + GITLEAKS_LICENSE: ${{ secrets.GITLEAKS_LICENSE }} + + summary: + name: Security Summary + runs-on: ubuntu-latest + needs: [pip-audit, secrets-scan] + if: always() + steps: + - name: Check results + run: | + echo "## Security Scan Summary" >> $GITHUB_STEP_SUMMARY + echo "" >> $GITHUB_STEP_SUMMARY + + if [ "${{ needs.pip-audit.result }}" == "success" ]; then + echo "✅ pip-audit: Passed" >> $GITHUB_STEP_SUMMARY + else + echo "❌ pip-audit: Failed" >> $GITHUB_STEP_SUMMARY + fi + + if [ "${{ needs.secrets-scan.result }}" == "success" ]; then + echo "✅ Secrets scan: Passed" >> $GITHUB_STEP_SUMMARY + else + echo "❌ Secrets scan: Failed" >> $GITHUB_STEP_SUMMARY + fi + + - name: Fail if any check failed + if: | + needs.pip-audit.result == 'failure' || + needs.secrets-scan.result == 'failure' + run: exit 1 \ No newline at end of file diff --git a/.github/workflows/semgrep.yml b/.github/workflows/semgrep.yml new file mode 100644 index 0000000..8149ea0 --- /dev/null +++ b/.github/workflows/semgrep.yml @@ -0,0 +1,53 @@ +# This workflow uses actions that are not certified by GitHub. +# They are provided by a third-party and are governed by +# separate terms of service, privacy policy, and support +# documentation. + +# This workflow file requires a free account on Semgrep.dev to +# manage rules, file ignores, notifications, and more. +# +# See https://semgrep.dev/docs + +name: Semgrep + +on: + push: + branches: [ "main", "develop" ] + pull_request: + # The branches below must be a subset of the branches above + branches: [ "main" ] + schedule: + - cron: '29 23 * * 6' + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +permissions: + contents: read + +jobs: + semgrep: + permissions: + contents: read # for actions/checkout to fetch code + security-events: write # for github/codeql-action/upload-sarif to upload SARIF results + actions: read # only required for a private repository by github/codeql-action/upload-sarif to get the Action run status + name: Scan + runs-on: ubuntu-latest + steps: + # Checkout project source + - uses: actions/checkout@v4 + + # Scan code using project's configuration on https://semgrep.dev/manage + - uses: returntocorp/semgrep-action@fcd5ab7459e8d91cb1777481980d1b18b4fc6735 + with: + publishToken: ${{ secrets.SEMGREP_APP_TOKEN }} + publishDeployment: ${{ secrets.SEMGREP_DEPLOYMENT_ID }} + generateSarif: "1" + + # Upload SARIF file generated in previous step + - name: Upload SARIF file + uses: github/codeql-action/upload-sarif@v3 + with: + sarif_file: semgrep.sarif + if: always()