Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
ac7f9ea
[54-fix-task-failure-handling-when-task-is-not-a-signature] - fail ra…
yedidyakfir Feb 3, 2026
bf85263
[54-fix-task-failure-handling-when-task-is-not-a-signature] - added t…
yedidyakfir Feb 3, 2026
2d7a93e
[54-fix-task-failure-handling-when-task-is-not-a-signature] - check i…
yedidyakfir Feb 3, 2026
fc31c24
[54-fix-task-failure-handling-when-task-is-not-a-signature] - fix check
yedidyakfir Feb 3, 2026
cc8f0b9
[54-fix-task-failure-handling-when-task-is-not-a-signature] - normal …
yedidyakfir Feb 3, 2026
a76a6cd
[54-fix-task-failure-handling-when-task-is-not-a-signature] - fix tes…
yedidyakfir Feb 3, 2026
67c5ecc
[54-fix-task-failure-handling-when-task-is-not-a-signature] - shorter…
yedidyakfir Feb 3, 2026
6abf580
[54-fix-task-failure-handling-when-task-is-not-a-signature] - use tas…
yedidyakfir Feb 3, 2026
21306f8
[54-fix-task-failure-handling-when-task-is-not-a-signature] - we crea…
yedidyakfir Feb 3, 2026
f0c9d45
[54-fix-task-failure-handling-when-task-is-not-a-signature] - we set …
yedidyakfir Feb 3, 2026
cb7bf4a
[54-fix-task-failure-handling-when-task-is-not-a-signature] - done an…
yedidyakfir Feb 3, 2026
77d8301
[54-fix-task-failure-handling-when-task-is-not-a-signature] - unused …
yedidyakfir Feb 3, 2026
81b45b0
[54-fix-task-failure-handling-when-task-is-not-a-signature] - remove …
yedidyakfir Feb 3, 2026
9704833
[54-fix-task-failure-handling-when-task-is-not-a-signature] - fix sig…
yedidyakfir Feb 3, 2026
32ed5c4
[54-fix-task-failure-handling-when-task-is-not-a-signature] - test ca…
yedidyakfir Feb 3, 2026
31a8401
[54-fix-task-failure-handling-when-task-is-not-a-signature] - we dont…
yedidyakfir Feb 3, 2026
b6e2a47
[54-fix-task-failure-handling-when-task-is-not-a-signature] - optimize
yedidyakfir Feb 3, 2026
93fdab4
[54-fix-task-failure-handling-when-task-is-not-a-signature] - added t…
yedidyakfir Feb 3, 2026
489471d
[54-fix-task-failure-handling-when-task-is-not-a-signature] - no more…
yedidyakfir Feb 3, 2026
37e2356
[54-fix-task-failure-handling-when-task-is-not-a-signature] - added s…
yedidyakfir Feb 3, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions .github/workflows/bandit.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# This workflow uses actions that are not certified by GitHub.
# They are provided by a third-party and are governed by
# separate terms of service, privacy policy, and support
# documentation.

# Bandit is a security linter designed to find common security issues in Python code.
# This action will run Bandit on your codebase.
# The results of the scan will be found under the Security tab of your repository.

# https://github.com/marketplace/actions/bandit-scan is ISC licensed, by abirismyname
# https://pypi.org/project/bandit/ is Apache v2.0 licensed, by PyCQA

name: Bandit
on:
push:
branches: [ "main", "develop" ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ "main" ]
schedule:
- cron: '31 8 * * 5'

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
bandit:
permissions:
contents: read # for actions/checkout to fetch code
security-events: write # for github/codeql-action/upload-sarif to upload SARIF results
actions: read # only required for a private repository by github/codeql-action/upload-sarif to get the Action run status

runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Bandit Scan
uses: shundor/python-bandit-scan@ab1d87dfccc5a0ffab88be3aaac6ffe35c10d6cd
with:
exit_zero: true
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
# Exclude test directories - B101 (assert_used) is valid in tests
excluded_paths: tests
# Skip B101 globally as asserts are valid for runtime checks too
skips: B101

10 changes: 7 additions & 3 deletions .github/workflows/codeql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@ name: "CodeQL Advanced"

on:
push:
branches: [ "main" ]
branches: [ "main", "develop" ]
pull_request:
branches: [ "main" ]
branches: [ "main", "develop" ]
schedule:
- cron: '24 17 * * 1'
- cron: '45 2 * * 0'

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
analyze:
Expand Down
51 changes: 51 additions & 0 deletions .github/workflows/pm.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
name: Close issues related to a merged pull request based on master branch.

on:
pull_request:
types: [closed]
branches:
- develop
- main

permissions:
issues: write

jobs:
closeIssueOnPrMergeTrigger:
if: github.event.pull_request.merged == true
runs-on: ubuntu-latest

steps:
- name: Closes issues related to a merged pull request.
uses: ldez/gha-mjolnir@v1.4.1
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

- name: Extract issue number from branch name and close
uses: actions/github-script@v7
with:
script: |
const branchName = context.payload.pull_request.head.ref;
// Match patterns: feature/86-..., fix/123-..., 86-..., issue-86
const match = branchName.match(/(?:^|\/|-)(\d+)(?:-|$)/);
if (match) {
const issueNumber = parseInt(match[1]);
try {
const issue = await github.rest.issues.get({
owner: context.repo.owner,
repo: context.repo.repo,
issue_number: issueNumber
});
if (issue.data.state === 'open') {
await github.rest.issues.update({
owner: context.repo.owner,
repo: context.repo.repo,
issue_number: issueNumber,
state: 'closed',
state_reason: 'completed'
});
}
} catch (error) {
if (error.status !== 404) throw error;
}
}
89 changes: 89 additions & 0 deletions .github/workflows/security.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
name: Security Scan

on:
push:
branches: [main, develop]
pull_request:
branches: [main, develop]
schedule:
# Run weekly on Monday at 9am UTC
- cron: '0 9 * * 1'
workflow_dispatch:

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

permissions:
contents: read
security-events: write

jobs:
pip-audit:
name: Dependency Vulnerability Scan
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'

- name: Install tools
run: |
python -m pip install --upgrade pip pipx
pip install pip-audit
pipx install poetry
pipx inject poetry poetry-plugin-export

- name: Export and audit dependencies
run: |
poetry export -f requirements.txt --without-hashes -o requirements-audit.txt
pip-audit --strict -r requirements-audit.txt


secrets-scan:
name: Secrets Detection
environment:
name: security
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0

- name: Run Gitleaks
uses: gitleaks/gitleaks-action@v2
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
GITLEAKS_LICENSE: ${{ secrets.GITLEAKS_LICENSE }}

summary:
name: Security Summary
runs-on: ubuntu-latest
needs: [pip-audit, secrets-scan]
if: always()
steps:
- name: Check results
run: |
echo "## Security Scan Summary" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY

if [ "${{ needs.pip-audit.result }}" == "success" ]; then
echo "✅ pip-audit: Passed" >> $GITHUB_STEP_SUMMARY
else
echo "❌ pip-audit: Failed" >> $GITHUB_STEP_SUMMARY
fi

if [ "${{ needs.secrets-scan.result }}" == "success" ]; then
echo "✅ Secrets scan: Passed" >> $GITHUB_STEP_SUMMARY
else
echo "❌ Secrets scan: Failed" >> $GITHUB_STEP_SUMMARY
fi

- name: Fail if any check failed
if: |
needs.pip-audit.result == 'failure' ||
needs.secrets-scan.result == 'failure'
run: exit 1
53 changes: 53 additions & 0 deletions .github/workflows/semgrep.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# This workflow uses actions that are not certified by GitHub.
# They are provided by a third-party and are governed by
# separate terms of service, privacy policy, and support
# documentation.

# This workflow file requires a free account on Semgrep.dev to
# manage rules, file ignores, notifications, and more.
#
# See https://semgrep.dev/docs

name: Semgrep

on:
push:
branches: [ "main", "develop" ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ "main" ]
schedule:
- cron: '29 23 * * 6'

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

permissions:
contents: read

jobs:
semgrep:
permissions:
contents: read # for actions/checkout to fetch code
security-events: write # for github/codeql-action/upload-sarif to upload SARIF results
actions: read # only required for a private repository by github/codeql-action/upload-sarif to get the Action run status
name: Scan
runs-on: ubuntu-latest
steps:
# Checkout project source
- uses: actions/checkout@v4

# Scan code using project's configuration on https://semgrep.dev/manage
- uses: returntocorp/semgrep-action@fcd5ab7459e8d91cb1777481980d1b18b4fc6735
with:
publishToken: ${{ secrets.SEMGREP_APP_TOKEN }}
publishDeployment: ${{ secrets.SEMGREP_DEPLOYMENT_ID }}
generateSarif: "1"

# Upload SARIF file generated in previous step
- name: Upload SARIF file
uses: github/codeql-action/upload-sarif@v3
with:
sarif_file: semgrep.sarif
if: always()
12 changes: 8 additions & 4 deletions mageflow/callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ async def wrapper(message: EmptyModel, ctx: Context, *args, **kwargs):
# NOTE: This should not run, the task should cancel, but just in case
return {"Error": "Task should have been canceled"}
try:
is_normal_run = invoker.is_vanilla_run()
signature = await invoker.start_task()
if send_signature:
kwargs["signature"] = signature
Expand All @@ -50,16 +51,19 @@ async def wrapper(message: EmptyModel, ctx: Context, *args, **kwargs):
else:
result = await flexible_call(func, message, ctx, *args, **kwargs)
except (Exception, asyncio.CancelledError) as e:
if is_normal_run:
raise
if not task_model.should_retry(ctx.attempt_number, e):
await invoker.task_failed()
await signature.failed()
await invoker.run_error()
await invoker.remove_task(with_error=False)
raise
else:
if is_normal_run:
return result
task_results = HatchetResult(hatchet_results=result)
dumped_results = task_results.model_dump(mode="json")
await invoker.run_success(dumped_results["hatchet_results"])
await invoker.remove_task(with_success=False)
await invoker.task_success(dumped_results["hatchet_results"])
await signature.done()
if wrap_res:
return task_results
else:
Expand Down
12 changes: 2 additions & 10 deletions mageflow/invokers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

from pydantic import BaseModel

from mageflow.signature.model import TaskSignature


class BaseInvoker(ABC):
@property
Expand All @@ -18,17 +16,11 @@ async def start_task(self):
pass

@abc.abstractmethod
async def run_success(self, result: Any) -> bool:
pass

@abc.abstractmethod
async def run_error(self) -> bool:
async def task_success(self, result: Any):
pass

@abc.abstractmethod
async def remove_task(
self, with_success: bool = True, with_error: bool = True
) -> TaskSignature | None:
async def task_failed(self):
pass

@abc.abstractmethod
Expand Down
44 changes: 20 additions & 24 deletions mageflow/invokers/hatchet.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,54 +28,50 @@ def __init__(self, message: BaseModel, ctx: Context):
def task_ctx(self) -> dict:
return self.task_data

@property
def task_id(self) -> str | None:
return self.task_data.get(TASK_ID_PARAM_NAME, None)

def is_vanilla_run(self):
return self.task_id is None

async def start_task(self) -> TaskSignature | None:
task_id = self.task_data.get(TASK_ID_PARAM_NAME, None)
task_id = self.task_id
if task_id:
async with TaskSignature.alock_from_key(task_id) as signature:
await signature.change_status(SignatureStatus.ACTIVE)
await signature.task_status.aupdate(worker_task_id=self.workflow_id)
return signature
return None

async def run_success(self, result: Any) -> bool:
async def task_success(self, result: Any):
success_publish_tasks = []
task_id = self.task_data.get(TASK_ID_PARAM_NAME, None)
task_id = self.task_id
if task_id:
current_task = await TaskSignature.get_safe(task_id)
task_success_workflows = current_task.activate_success(result)
await current_task.done()
success_publish_tasks.append(asyncio.create_task(task_success_workflows))

if success_publish_tasks:
await asyncio.gather(*success_publish_tasks)
return True
return False
if success_publish_tasks:
await asyncio.gather(*success_publish_tasks)

await current_task.remove(with_success=False)

async def run_error(self) -> bool:
async def task_failed(self):
error_publish_tasks = []
task_id = self.task_data.get(TASK_ID_PARAM_NAME, None)
task_id = self.task_id
if task_id:
current_task = await TaskSignature.get_safe(task_id)
task_error_workflows = current_task.activate_error(self.message)
error_publish_tasks.append(asyncio.create_task(task_error_workflows))

if error_publish_tasks:
await asyncio.gather(*error_publish_tasks)
return True
return False
if error_publish_tasks:
await asyncio.gather(*error_publish_tasks)

async def remove_task(
self, with_success: bool = True, with_error: bool = True
) -> TaskSignature | None:
task_id = self.task_data.get(TASK_ID_PARAM_NAME, None)
if task_id:
signature = await TaskSignature.get_safe(task_id)
if signature:
await signature.remove(with_error, with_success)
return None
await current_task.remove(with_error=False)

async def should_run_task(self) -> bool:
task_id = self.task_data.get(TASK_ID_PARAM_NAME, None)
task_id = self.task_id
if task_id:
signature = await TaskSignature.get_safe(task_id)
if signature is None:
Expand Down
Loading