Skip to content

Conversation

@yedidyakfir
Copy link
Collaborator

Summary

Refactor task failure handling to properly support normal (non-signature) tasks and update for Hatchet SDK compatibility up to version 1.23.0. Also adds dynamic roadmap generation during deployment.

Changes

  • Task failure handling refactor: Normal tasks now raise errors directly instead of going through signature-based error handling
  • Invoker API simplification: Replaced run_success, run_error, and remove_task methods with simpler task_success and task_failed methods
  • Added is_vanilla_run() method: Detects when a task is running without a signature (normal Hatchet task)
  • Added task_id property: Centralized task ID access in the invoker
  • Removed unused code: Removed SIGNATURES_NAME_MAPPING and update_register_signature_models() startup function
  • Dynamic roadmap generation: Added script to generate roadmap.md from GitHub issues/milestones during docs deployment

Testing

  • Added tests for normal task failure behavior (test_check_normal_task_fails__sanity)
  • Added tests for normal task retry behavior (test_retry_normal_tasks__sanity)
  • Added tests for cancel and timeout task outputs
  • Existing signature-based tests continue to pass

…est to check that normal fail task raises the correct error message
…f task is normal task, if true raise error noramlly
…d fail is called after callbacks. rename to task success
Copilot AI review requested due to automatic review settings February 3, 2026 22:01
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Refactors Hatchet task invocation/failure handling to support “vanilla” (non-signature) tasks and updates integration/docs tooling for Hatchet SDK compatibility up to 1.23.0, including generating the docs roadmap from GitHub issues during deployment.

Changes:

  • Refactored invoker/callback flow: simplified invoker API (task_success/task_failed), added task_id, and introduced detection of vanilla runs.
  • Updated integration/unit tests to cover vanilla task failure/retry behavior and adjusted assertions for newer Hatchet SDK output/status fields.
  • Added GitHub Actions-driven roadmap generation (docs/roadmap.md) via a new script, and wired it into the docs deployment workflow.

Reviewed changes

Copilot reviewed 14 out of 15 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
mageflow/callbacks.py Adjusts wrapper behavior to differentiate vanilla runs vs signature runs; updates success/failure handling calls.
mageflow/invokers/base.py Simplifies invoker interface to task_success / task_failed.
mageflow/invokers/hatchet.py Adds task_id + vanilla run detection; updates task completion/failure publishing and cleanup flow.
mageflow/startup.py Removes signature model registration step during startup.
mageflow/signature/model.py Removes unused SIGNATURES_NAME_MAPPING.
tests/unit/conftest.py Removes startup hook for signature model registration; minor formatting tweak.
tests/integration/hatchet/worker.py Adds a vanilla retry task; changes failure task to raise a custom test exception.
tests/integration/hatchet/models.py Adds MageflowTestError for consistent error-message assertions.
tests/integration/hatchet/assertions.py Removes the public assert_task_done helper (internal assertion helpers remain).
tests/integration/hatchet/signature/test__signature.py Updates assertions to validate status/output with newer Hatchet SDK shapes.
tests/integration/hatchet/signature/test_edge_case.py Adds vanilla-task failure/retry/cancel/timeout “sanity” tests and new SDK imports.
docs/roadmap.md Removed from the repo to be generated during docs deployment.
.gitignore Ignores generated docs/roadmap.md.
.github/workflows/docs.yml Adds roadmap generation step and triggers on issue/milestone events.
.github/scripts/generate_roadmap.py New script to generate docs/roadmap.md from GitHub milestones/issues.
Comments suppressed due to low confidence (1)

.github/workflows/docs.yml:23

  • The workflow generates docs/roadmap.md by calling the GitHub Issues/Milestones REST APIs, but permissions does not grant issues: read. With the explicit permissions: block, any unspecified scopes default to none, so these API calls can 403 and fail the docs deployment. Add issues: read (and keep contents: read/pages: write/id-token: write as-is).
permissions:
  contents: read
  pages: write
  id-token: write

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +11 to +18
def make_request(url, token=None):
headers = {"Accept": "application/vnd.github.v3+json"}
if token:
headers["Authorization"] = f"token {token}"

req = urllib.request.Request(url, headers=headers)
with urllib.request.urlopen(req) as response:
return json.loads(response.read().decode())
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GitHub REST API requests should include a User-Agent header; without it, the API can reject requests. Update make_request to always send a clear User-Agent (and keep the existing Accept/Authorization headers).

Copilot uses AI. Check for mistakes.
Comment on lines +57 to 58
await invoker.task_failed()
await signature.failed()
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the failure path, signature.failed() is called after invoker.task_failed(). If publishing error callbacks (or cleanup) raises, the signature status may remain ACTIVE and the signature may never be marked FAILED. To preserve the previous behavior, set the signature status to FAILED before publishing callbacks (or ensure it happens in a finally).

Suggested change
await invoker.task_failed()
await signature.failed()
await signature.failed()
await invoker.task_failed()

Copilot uses AI. Check for mistakes.
Comment on lines +65 to +66
await invoker.task_success(dumped_results["hatchet_results"])
await signature.done()
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the success path, signature.done() is called after invoker.task_success(). If publishing success callbacks raises, the signature status may remain ACTIVE and never transition to DONE. Consider marking the signature DONE before publishing callbacks (or ensure status update happens even if callback publishing fails).

Suggested change
await invoker.task_success(dumped_results["hatchet_results"])
await signature.done()
await signature.done()
await invoker.task_success(dumped_results["hatchet_results"])

Copilot uses AI. Check for mistakes.
def task_id(self) -> str | None:
return self.task_data.get(TASK_ID_PARAM_NAME, None)

def is_vanilla_run(self):
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HatchetInvoker.is_vanilla_run has no return type annotation. Since it’s used as a boolean gate in task execution, add -> bool (and consider making it a @property for consistency with task_id).

Suggested change
def is_vanilla_run(self):
def is_vanilla_run(self) -> bool:

Copilot uses AI. Check for mistakes.
Comment on lines +47 to +71
async def task_success(self, result: Any):
success_publish_tasks = []
task_id = self.task_data.get(TASK_ID_PARAM_NAME, None)
task_id = self.task_id
if task_id:
current_task = await TaskSignature.get_safe(task_id)
task_success_workflows = current_task.activate_success(result)
await current_task.done()
success_publish_tasks.append(asyncio.create_task(task_success_workflows))

if success_publish_tasks:
await asyncio.gather(*success_publish_tasks)
return True
return False
if success_publish_tasks:
await asyncio.gather(*success_publish_tasks)

await current_task.remove(with_success=False)

async def run_error(self) -> bool:
async def task_failed(self):
error_publish_tasks = []
task_id = self.task_data.get(TASK_ID_PARAM_NAME, None)
task_id = self.task_id
if task_id:
current_task = await TaskSignature.get_safe(task_id)
task_error_workflows = current_task.activate_error(self.message)
error_publish_tasks.append(asyncio.create_task(task_error_workflows))

if error_publish_tasks:
await asyncio.gather(*error_publish_tasks)
return True
return False
if error_publish_tasks:
await asyncio.gather(*error_publish_tasks)

async def remove_task(
self, with_success: bool = True, with_error: bool = True
) -> TaskSignature | None:
task_id = self.task_data.get(TASK_ID_PARAM_NAME, None)
if task_id:
signature = await TaskSignature.get_safe(task_id)
if signature:
await signature.remove(with_error, with_success)
return None
await current_task.remove(with_error=False)
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

task_success/task_failed build a list, create exactly one task, then immediately gather it. This adds complexity without enabling real parallelism. Consider directly awaiting current_task.activate_success(...) / activate_error(...) (or, if you intend concurrency, justify/extend it to multiple tasks).

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants