From 1291dd93a0acc696b216fd2c4bc86c2f1c088d0c Mon Sep 17 00:00:00 2001 From: Ben Elam Date: Tue, 18 Nov 2025 15:02:23 -0800 Subject: [PATCH 1/6] Add global default authorizers for core workflows --- orchestrator/settings.py | 27 +++++++++++++++++++ orchestrator/workflows/modify_note.py | 11 +++++++- orchestrator/workflows/removed_workflow.py | 9 ++++++- .../workflows/tasks/cleanup_tasks_log.py | 11 ++++++-- .../workflows/tasks/resume_workflows.py | 4 +++ .../workflows/tasks/validate_product_type.py | 8 +++++- .../workflows/tasks/validate_products.py | 10 ++++++- 7 files changed, 74 insertions(+), 6 deletions(-) diff --git a/orchestrator/settings.py b/orchestrator/settings.py index fdedc783b..fe3fc029a 100644 --- a/orchestrator/settings.py +++ b/orchestrator/settings.py @@ -17,10 +17,12 @@ from typing import Literal from pydantic import Field, NonNegativeInt, PostgresDsn, RedisDsn +from pydantic.main import BaseModel from pydantic_settings import BaseSettings from oauth2_lib.settings import oauth2lib_settings from orchestrator.services.settings_env_variables import expose_settings +from orchestrator.utils.auth import Authorizer from orchestrator.utils.expose_settings import SecretStr as OrchSecretStr from pydantic_forms.types import strEnum @@ -111,3 +113,28 @@ class AppSettings(BaseSettings): expose_settings("app_settings", app_settings) # type: ignore if app_settings.EXPOSE_OAUTH_SETTINGS: expose_settings("oauth2lib_settings", oauth2lib_settings) # type: ignore + + +class Authorizers(BaseModel): + # Callbacks specifically for orchestrator-core callbacks. + # Separate from defaults for user-defined workflows and steps. + internal_authorize_callback: Authorizer | None = None + internal_retry_auth_callback: Authorizer | None = None + + +_authorizers = Authorizers() + + +def get_authorizers() -> Authorizers: + """Acquire singleton of app authorizers to assign these callbacks at app setup. + + Ensures downstream users can acquire singleton without being tempted to do + from orchestrator.settings import authorizers + authorizers = my_authorizers + or + from orchestrator import settings + settings.authorizers = my_authorizers + + ...each of which goes wrong in its own way. + """ + return _authorizers diff --git a/orchestrator/workflows/modify_note.py b/orchestrator/workflows/modify_note.py index 97cdaa483..916aa26ff 100644 --- a/orchestrator/workflows/modify_note.py +++ b/orchestrator/workflows/modify_note.py @@ -13,6 +13,7 @@ from orchestrator.db import db from orchestrator.forms import SubmitFormPage from orchestrator.services import subscriptions +from orchestrator.settings import get_authorizers from orchestrator.targets import Target from orchestrator.utils.json import to_serializable from orchestrator.workflow import StepList, done, init, step, workflow @@ -21,6 +22,8 @@ from pydantic_forms.types import FormGenerator, State, UUIDstr from pydantic_forms.validators import LongText +authorizers = get_authorizers() + def initial_input_form(subscription_id: UUIDstr) -> FormGenerator: subscription = subscriptions.get_subscription(subscription_id) @@ -51,6 +54,12 @@ def store_subscription_note(subscription_id: UUIDstr, note: str) -> State: } -@workflow("Modify Note", initial_input_form=wrap_modify_initial_input_form(initial_input_form), target=Target.MODIFY) +@workflow( + "Modify Note", + initial_input_form=wrap_modify_initial_input_form(initial_input_form), + target=Target.MODIFY, + authorize_callback=authorizers.internal_authorize_callback, + retry_auth_callback=authorizers.internal_retry_auth_callback, +) def modify_note() -> StepList: return init >> store_process_subscription() >> store_subscription_note >> done diff --git a/orchestrator/workflows/removed_workflow.py b/orchestrator/workflows/removed_workflow.py index 6b7090209..022a63bcf 100644 --- a/orchestrator/workflows/removed_workflow.py +++ b/orchestrator/workflows/removed_workflow.py @@ -12,11 +12,18 @@ # limitations under the License. +from orchestrator.settings import get_authorizers from orchestrator.workflow import StepList, workflow +authorizers = get_authorizers() + # This workflow has been made to create the initial import process for a SN7 subscription # it does not do anything but is needed for the correct showing in the GUI. -@workflow("Dummy workflow to replace removed workflows") +@workflow( + "Dummy workflow to replace removed workflows", + authorize_callback=authorizers.internal_authorize_callback, + retry_auth_callback=authorizers.internal_retry_auth_callback, +) def removed_workflow() -> StepList: return StepList() diff --git a/orchestrator/workflows/tasks/cleanup_tasks_log.py b/orchestrator/workflows/tasks/cleanup_tasks_log.py index b648724ec..2cc6c7cdb 100644 --- a/orchestrator/workflows/tasks/cleanup_tasks_log.py +++ b/orchestrator/workflows/tasks/cleanup_tasks_log.py @@ -17,12 +17,14 @@ from sqlalchemy import select from orchestrator.db import ProcessTable, db -from orchestrator.settings import app_settings +from orchestrator.settings import app_settings, get_authorizers from orchestrator.targets import Target from orchestrator.utils.datetime import nowtz from orchestrator.workflow import ProcessStatus, StepList, done, init, step, workflow from pydantic_forms.types import State +authorizers = get_authorizers() + @step("Clean up completed tasks older than TASK_LOG_RETENTION_DAYS") def remove_tasks() -> State: @@ -41,6 +43,11 @@ def remove_tasks() -> State: return {"tasks_removed": count} -@workflow("Clean up old tasks", target=Target.SYSTEM) +@workflow( + "Clean up old tasks", + target=Target.SYSTEM, + authorize_callback=authorizers.internal_authorize_callback, + retry_auth_callback=authorizers.internal_retry_auth_callback, +) def task_clean_up_tasks() -> StepList: return init >> remove_tasks >> done diff --git a/orchestrator/workflows/tasks/resume_workflows.py b/orchestrator/workflows/tasks/resume_workflows.py index d3fd3cb3f..0f6e02468 100644 --- a/orchestrator/workflows/tasks/resume_workflows.py +++ b/orchestrator/workflows/tasks/resume_workflows.py @@ -17,10 +17,12 @@ from orchestrator.db import ProcessTable, db from orchestrator.services import processes +from orchestrator.settings import get_authorizers from orchestrator.targets import Target from orchestrator.workflow import ProcessStatus, StepList, done, init, step, workflow from pydantic_forms.types import State, UUIDstr +authorizers = get_authorizers() logger = structlog.get_logger(__name__) @@ -110,6 +112,8 @@ def restart_created_workflows(created_state_process_ids: list[UUIDstr]) -> State @workflow( "Resume all workflows that are stuck on tasks with the status 'waiting', 'created' or 'resumed'", target=Target.SYSTEM, + authorize_callback=authorizers.internal_authorize_callback, + retry_auth_callback=authorizers.internal_retry_auth_callback, ) def task_resume_workflows() -> StepList: return init >> find_waiting_workflows >> resume_found_workflows >> restart_created_workflows >> done diff --git a/orchestrator/workflows/tasks/validate_product_type.py b/orchestrator/workflows/tasks/validate_product_type.py index c10ab9ad1..c2e68a245 100644 --- a/orchestrator/workflows/tasks/validate_product_type.py +++ b/orchestrator/workflows/tasks/validate_product_type.py @@ -25,10 +25,12 @@ get_validation_product_workflows_for_subscription, start_validation_workflow_for_workflows, ) +from orchestrator.settings import get_authorizers from orchestrator.targets import Target from orchestrator.workflow import StepList, done, init, step, workflow from pydantic_forms.types import FormGenerator, State +authorizers = get_authorizers() logger = structlog.get_logger(__name__) @@ -86,7 +88,11 @@ def validate_product_type(product_type: str) -> State: @workflow( - "Validate all subscriptions of Product Type", target=Target.SYSTEM, initial_input_form=initial_input_form_generator + "Validate all subscriptions of Product Type", + target=Target.SYSTEM, + initial_input_form=initial_input_form_generator, + authorize_callback=authorizers.internal_authorize_callback, + retry_auth_callback=authorizers.internal_retry_auth_callback, ) def task_validate_product_type() -> StepList: return init >> validate_product_type >> done diff --git a/orchestrator/workflows/tasks/validate_products.py b/orchestrator/workflows/tasks/validate_products.py index 8aa9f4b7c..d364a1471 100644 --- a/orchestrator/workflows/tasks/validate_products.py +++ b/orchestrator/workflows/tasks/validate_products.py @@ -26,12 +26,15 @@ from orchestrator.services.products import get_products from orchestrator.services.translations import generate_translations from orchestrator.services.workflows import get_workflow_by_name, get_workflows +from orchestrator.settings import get_authorizers from orchestrator.targets import Target from orchestrator.utils.errors import ProcessFailureError from orchestrator.utils.fixed_inputs import fixed_input_configuration as fi_configuration from orchestrator.workflow import StepList, done, init, step, workflow from pydantic_forms.types import State +authorizers = get_authorizers() + # Since these errors are probably programming failures we should not throw AssertionErrors @@ -187,7 +190,12 @@ def check_subscription_models() -> State: return {"check_subscription_models": True} -@workflow("Validate products", target=Target.SYSTEM) +@workflow( + "Validate products", + target=Target.SYSTEM, + authorize_callback=authorizers.internal_authorize_callback, + retry_auth_callback=authorizers.internal_retry_auth_callback, +) def task_validate_products() -> StepList: return ( init From a073b37f711cba355d62125b68eb44f542f79e54 Mon Sep 17 00:00:00 2001 From: Ben Elam Date: Thu, 11 Dec 2025 12:48:12 -0800 Subject: [PATCH 2/6] Refactor callback registration per review --- orchestrator/app.py | 35 ++++++++++++++++++- orchestrator/settings.py | 21 +++++++++++ orchestrator/workflows/modify_note.py | 4 +-- orchestrator/workflows/removed_workflow.py | 4 +-- .../workflows/tasks/cleanup_tasks_log.py | 4 +-- .../workflows/tasks/resume_workflows.py | 4 +-- .../workflows/tasks/validate_product_type.py | 4 +-- .../workflows/tasks/validate_products.py | 4 +-- .../workflows/tasks/validate_subscriptions.py | 11 ++++-- 9 files changed, 76 insertions(+), 15 deletions(-) diff --git a/orchestrator/app.py b/orchestrator/app.py index 36c0177f6..b9317a52a 100644 --- a/orchestrator/app.py +++ b/orchestrator/app.py @@ -57,7 +57,8 @@ from orchestrator.log_config import LOGGER_OVERRIDES from orchestrator.metrics import ORCHESTRATOR_METRICS_REGISTRY, initialize_default_metrics from orchestrator.services.process_broadcast_thread import ProcessDataBroadcastThread -from orchestrator.settings import AppSettings, ExecutorType, app_settings +from orchestrator.settings import AppSettings, ExecutorType, app_settings, get_authorizers +from orchestrator.utils.auth import Authorizer from orchestrator.version import GIT_COMMIT_HASH from orchestrator.websocket import init_websocket_manager from pydantic_forms.exception_handlers.fastapi import form_error_handler @@ -311,6 +312,38 @@ def register_graphql_authorization(self, graphql_authorization_instance: Graphql """ self.auth_manager.graphql_authorization = graphql_authorization_instance + def register_internal_authorize_callback(self, callback: Authorizer) -> None: + """Registers the authorize_callback for WFO's internal workflows and tasks. + + Since RBAC policies are applied to workflows via decorator, this enables registration of callbacks + for workflows defined in orchestrator-core itself. + However, this assignment MUST be made before any workflows are run. + + Args: + callback (Authorizer): The async Authorizer to run for the `authorize_callback` argument of internal workflows. + + Returns: + None + """ + authorizers = get_authorizers() + authorizers.internal_authorize_callback = callback + + def register_internal_retry_auth_callback(self, callback: Authorizer) -> None: + """Registers the retry_auth_callback for WFO's internal workflows and tasks. + + Since RBAC policies are applied to workflows via decorator, this enables registration of callbacks + for workflows defined in orchestrator-core itself. + However, this assignment MUST be made before any workflows are run. + + Args: + callback (Authorizer): The async Authorizer to run for the `retry_auth_callback` argument of internal workflows. + + Returns: + None + """ + authorizers = get_authorizers() + authorizers.internal_retry_auth_callback = callback + main_typer_app = typer.Typer() main_typer_app.add_typer(cli_app, name="orchestrator", help="The orchestrator CLI commands") diff --git a/orchestrator/settings.py b/orchestrator/settings.py index fe3fc029a..9decf5596 100644 --- a/orchestrator/settings.py +++ b/orchestrator/settings.py @@ -20,6 +20,7 @@ from pydantic.main import BaseModel from pydantic_settings import BaseSettings +from oauth2_lib.fastapi import OIDCUserModel from oauth2_lib.settings import oauth2lib_settings from orchestrator.services.settings_env_variables import expose_settings from orchestrator.utils.auth import Authorizer @@ -121,6 +122,26 @@ class Authorizers(BaseModel): internal_authorize_callback: Authorizer | None = None internal_retry_auth_callback: Authorizer | None = None + async def authorize_callback(self, user: OIDCUserModel | None) -> bool: + """This is the authorize_callback to be registered for workflows defined within orchestrator-core. + + If Authorizers.internal_authorize_callback is None, this function will return True. + i.e. any user will be authorized to start internal workflows. + """ + if self.internal_authorize_callback is None: + return True + return await self.internal_authorize_callback(user) + + async def retry_auth_callback(self, user: OIDCUserModel | None) -> bool: + """This is the retry_auth_callback to be registered for workflows defined within orchestrator-core. + + If Authorizers.internal_retry_auth_callback is None, this function will return True. + i.e. any user will be authorized to retry internal workflows on failure. + """ + if self.internal_retry_auth_callback is None: + return True + return await self.internal_retry_auth_callback(user) + _authorizers = Authorizers() diff --git a/orchestrator/workflows/modify_note.py b/orchestrator/workflows/modify_note.py index 916aa26ff..aa712fefb 100644 --- a/orchestrator/workflows/modify_note.py +++ b/orchestrator/workflows/modify_note.py @@ -58,8 +58,8 @@ def store_subscription_note(subscription_id: UUIDstr, note: str) -> State: "Modify Note", initial_input_form=wrap_modify_initial_input_form(initial_input_form), target=Target.MODIFY, - authorize_callback=authorizers.internal_authorize_callback, - retry_auth_callback=authorizers.internal_retry_auth_callback, + authorize_callback=authorizers.authorize_callback, + retry_auth_callback=authorizers.retry_auth_callback, ) def modify_note() -> StepList: return init >> store_process_subscription() >> store_subscription_note >> done diff --git a/orchestrator/workflows/removed_workflow.py b/orchestrator/workflows/removed_workflow.py index 022a63bcf..50ea6b3d2 100644 --- a/orchestrator/workflows/removed_workflow.py +++ b/orchestrator/workflows/removed_workflow.py @@ -22,8 +22,8 @@ # it does not do anything but is needed for the correct showing in the GUI. @workflow( "Dummy workflow to replace removed workflows", - authorize_callback=authorizers.internal_authorize_callback, - retry_auth_callback=authorizers.internal_retry_auth_callback, + authorize_callback=authorizers.authorize_callback, + retry_auth_callback=authorizers.retry_auth_callback, ) def removed_workflow() -> StepList: return StepList() diff --git a/orchestrator/workflows/tasks/cleanup_tasks_log.py b/orchestrator/workflows/tasks/cleanup_tasks_log.py index 2cc6c7cdb..25a033761 100644 --- a/orchestrator/workflows/tasks/cleanup_tasks_log.py +++ b/orchestrator/workflows/tasks/cleanup_tasks_log.py @@ -46,8 +46,8 @@ def remove_tasks() -> State: @workflow( "Clean up old tasks", target=Target.SYSTEM, - authorize_callback=authorizers.internal_authorize_callback, - retry_auth_callback=authorizers.internal_retry_auth_callback, + authorize_callback=authorizers.authorize_callback, + retry_auth_callback=authorizers.retry_auth_callback, ) def task_clean_up_tasks() -> StepList: return init >> remove_tasks >> done diff --git a/orchestrator/workflows/tasks/resume_workflows.py b/orchestrator/workflows/tasks/resume_workflows.py index 0f6e02468..f544fc3d5 100644 --- a/orchestrator/workflows/tasks/resume_workflows.py +++ b/orchestrator/workflows/tasks/resume_workflows.py @@ -112,8 +112,8 @@ def restart_created_workflows(created_state_process_ids: list[UUIDstr]) -> State @workflow( "Resume all workflows that are stuck on tasks with the status 'waiting', 'created' or 'resumed'", target=Target.SYSTEM, - authorize_callback=authorizers.internal_authorize_callback, - retry_auth_callback=authorizers.internal_retry_auth_callback, + authorize_callback=authorizers.authorize_callback, + retry_auth_callback=authorizers.retry_auth_callback, ) def task_resume_workflows() -> StepList: return init >> find_waiting_workflows >> resume_found_workflows >> restart_created_workflows >> done diff --git a/orchestrator/workflows/tasks/validate_product_type.py b/orchestrator/workflows/tasks/validate_product_type.py index c2e68a245..08c3c4e5c 100644 --- a/orchestrator/workflows/tasks/validate_product_type.py +++ b/orchestrator/workflows/tasks/validate_product_type.py @@ -91,8 +91,8 @@ def validate_product_type(product_type: str) -> State: "Validate all subscriptions of Product Type", target=Target.SYSTEM, initial_input_form=initial_input_form_generator, - authorize_callback=authorizers.internal_authorize_callback, - retry_auth_callback=authorizers.internal_retry_auth_callback, + authorize_callback=authorizers.authorize_callback, + retry_auth_callback=authorizers.retry_auth_callback, ) def task_validate_product_type() -> StepList: return init >> validate_product_type >> done diff --git a/orchestrator/workflows/tasks/validate_products.py b/orchestrator/workflows/tasks/validate_products.py index d364a1471..e0626bb90 100644 --- a/orchestrator/workflows/tasks/validate_products.py +++ b/orchestrator/workflows/tasks/validate_products.py @@ -193,8 +193,8 @@ def check_subscription_models() -> State: @workflow( "Validate products", target=Target.SYSTEM, - authorize_callback=authorizers.internal_authorize_callback, - retry_auth_callback=authorizers.internal_retry_auth_callback, + authorize_callback=authorizers.authorize_callback, + retry_auth_callback=authorizers.retry_auth_callback, ) def task_validate_products() -> StepList: return ( diff --git a/orchestrator/workflows/tasks/validate_subscriptions.py b/orchestrator/workflows/tasks/validate_subscriptions.py index a9ad92197..af9d7fbd7 100644 --- a/orchestrator/workflows/tasks/validate_subscriptions.py +++ b/orchestrator/workflows/tasks/validate_subscriptions.py @@ -24,7 +24,7 @@ get_validation_product_workflows_for_subscription, start_validation_workflow_for_workflows, ) -from orchestrator.settings import app_settings +from orchestrator.settings import app_settings, get_authorizers from orchestrator.targets import Target from orchestrator.workflow import StepList, init, step, workflow @@ -33,6 +33,8 @@ task_semaphore = BoundedSemaphore(value=2) +authorizers = get_authorizers() + @step("Validate subscriptions") def validate_subscriptions() -> None: @@ -56,6 +58,11 @@ def validate_subscriptions() -> None: start_validation_workflow_for_workflows(subscription=subscription, workflows=validation_product_workflows) -@workflow("Validate subscriptions", target=Target.SYSTEM) +@workflow( + "Validate subscriptions", + target=Target.SYSTEM, + authorize_callback=authorizers.authorize_callback, + retry_auth_callback=authorizers.retry_auth_callback, +) def task_validate_subscriptions() -> StepList: return init >> validate_subscriptions From 7b6be82a89ec520621bb44f91e1ad46eff11c26a Mon Sep 17 00:00:00 2001 From: Ben Elam Date: Thu, 11 Dec 2025 13:47:20 -0800 Subject: [PATCH 3/6] Update docs and fix broken links in same document --- .../auth-backend-and-frontend.md | 28 +++++++++++++++++-- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/docs/reference-docs/auth-backend-and-frontend.md b/docs/reference-docs/auth-backend-and-frontend.md index 575a1f18b..7f5843230 100644 --- a/docs/reference-docs/auth-backend-and-frontend.md +++ b/docs/reference-docs/auth-backend-and-frontend.md @@ -34,7 +34,7 @@ WFO provides authentication based on an OIDC provider. The OIDC provider is pres #### Frontend -The WFO frontend uses [NextAuth](3) to handle authentication. Authentication configuration can be found in [page/api/auth/[...nextauth].ts](4) +The WFO frontend uses [NextAuth][3] to handle authentication. Authentication configuration can be found in [page/api/auth/[...nextauth].ts][4] **ENV variables** These variables need to be set for authentication to work on the frontend. @@ -56,7 +56,7 @@ NEXTAUTH_SECRET=[SECRET] // Used by NextAuth to encrypt the JWT token With authentication turned on and these variables provided the frontend application will redirect unauthorized users to the login screen provided by the OIDC provider to request their credentials and return them to the page they tried to visit. -Note: It's possible to add additional oidc providers including some that are provided by the NextAuth library like Google, Apple and others. See [NextAuthProviders](5) for more information. +Note: It's possible to add additional oidc providers including some that are provided by the NextAuth library like Google, Apple and others. See [NextAuthProviders][5] for more information. ##### Authorization @@ -219,7 +219,9 @@ class GraphqlAuthorization(ABC): ``` -Graphql Authorization decisions can be made based on request properties and user attributes +Graphql Authorization decisions can be made based on request properties and user attributes. + +[Additional methods](#authorization-for-internal-workflows) exist for defining role-based access control for internal workflows. ### Example @@ -525,6 +527,24 @@ are prioritized in different workflow and inputstep configurations. +### Authorization for internal workflows +Users of Workflow Orchestrator can't directly access the `@workflow` decorators of tasks and workflows defined within `orchestrator-core`. +However, authorization callbacks can still be passed via the `OrchestratorCore` class when initializing your WFO application. + +```python +from orchestrator import OrchestratorCore + +app = OrchestratorCore() + +# Let foo and bar be Authorizers +app.register_internal_authorize_callback(foo) +app.register_internal_retry_auth_callback(bar) +``` + +If these callbacks are not registered, these workflows can be started and retried by all users by default. + +For more on application startup, see the [Settings Overview page][settings-overview]. + ### Examples Assume we have the following function that can be used to create callbacks: @@ -590,6 +610,8 @@ We can now construct a variety of authorization policies. Note that we could specify `auth=allow_roles("user")` if helpful, or we can omit `auth` to fail open to any logged in user. +[settings-overview]: ../../reference-docs/app/settings-overview + [1]: https://github.com/workfloworchestrator/example-orchestrator-ui [2]: https://github.com/workfloworchestrator/example-orchestrator [3]: https://next-auth.js.org/ From 44ed3c49cf3aeef484b4baa7295e6bebebe96dee Mon Sep 17 00:00:00 2001 From: Ben Elam Date: Thu, 11 Dec 2025 15:18:22 -0800 Subject: [PATCH 4/6] Update tests --- test/unit_tests/api/test_processes.py | 69 +++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/test/unit_tests/api/test_processes.py b/test/unit_tests/api/test_processes.py index 32d011467..dbec43b90 100644 --- a/test/unit_tests/api/test_processes.py +++ b/test/unit_tests/api/test_processes.py @@ -38,6 +38,7 @@ step_group, workflow, ) +from orchestrator.workflows.tasks.cleanup_tasks_log import task_clean_up_tasks from pydantic_forms.core import FormPage from test.unit_tests.helpers import URL_STR_TYPE from test.unit_tests.workflows import WorkflowInstanceForTests @@ -1087,3 +1088,71 @@ def test_authorized_retrystep_retry(test_client, process_on_retriable_retrystep) def test_unauthorized_retrystep_retry(test_client, process_on_unretriable_retrystep): response = test_client.put(f"/api/processes/{process_on_unretriable_retrystep}/resume", json={}) assert HTTPStatus.FORBIDDEN == response.status_code + + +def test_internal_authorize_callback(test_client, fastapi_app): + """Test RBAC callbacks can restrict access to internal workflows.""" + + async def disallow(_: OIDCUserModel | None = None) -> bool: + return False + + with mock.patch("orchestrator.api.api_v1.endpoints.processes.start_process") as mock_start_process: + # Just return a bogus UUID instead of actually starting a process. + mock_start_process.return_value = uuid4() + + # Test that we succeed in the default case (no authorizer) + response = test_client.post("/api/processes/task_clean_up_tasks", json=[{}]) + assert HTTPStatus.CREATED == response.status_code + + # Test that disallow now blocks us + fastapi_app.register_internal_authorize_callback(disallow) + response = test_client.post("/api/processes/task_clean_up_tasks", json=[{}]) + assert HTTPStatus.FORBIDDEN == response.status_code + + +@pytest.fixture +def internal_process_on_retry_step(): + """A task_clean_up_tasks process stuck on a failed step.""" + # Don't know the UUID of task_clean_up_tasks at test time, so we temporarily register a copy of it. + with WorkflowInstanceForTests(task_clean_up_tasks, "task_clean_up_tasks_again") as wf: + process_id = uuid4() + process = ProcessTable( + process_id=process_id, + workflow_id=wf.workflow_id, + last_status=ProcessStatus.FAILED, + last_step="remove_tasks", + ) + init_step = ProcessStepTable(process_id=process_id, name="Start", status=StepStatus.SUCCESS, state={}) + failed_step = ProcessStepTable(process_id=process_id, name="remove_tasks", status=StepStatus.FAILED, state={}) + + db.session.add(process) + db.session.add(init_step) + db.session.add(failed_step) + db.session.commit() + + # Yield, not return, since we need the workflow to persist for the duration of the test. + yield process_id + + +def test_internal_retry_auth_callback(test_client, fastapi_app, internal_process_on_retry_step): + """Test that RBAC callbacks can manage access to retrying internal workflows.""" + + async def disallow(_: OIDCUserModel | None = None) -> bool: + return False + + async def allow(_: OIDCUserModel | None = None) -> bool: + return True + + with mock.patch("orchestrator.api.api_v1.endpoints.processes.start_process") as mock_start_process: + # Just return a bogus UUID instead of actually starting a process. + mock_start_process.return_value = uuid4() + + # Start with disallow. This should block us. + fastapi_app.register_internal_retry_auth_callback(disallow) + response = test_client.put(f"/api/processes/{internal_process_on_retry_step}/resume", json={}) + assert HTTPStatus.FORBIDDEN == response.status_code + + # Update to allow. This should succeed. + fastapi_app.register_internal_retry_auth_callback(allow) + response = test_client.put(f"/api/processes/{internal_process_on_retry_step}/resume", json={}) + assert HTTPStatus.NO_CONTENT == response.status_code From dfedaa88c4f1e7d0f34eac87b1cda5fc16b93feb Mon Sep 17 00:00:00 2001 From: Ben Elam Date: Fri, 12 Dec 2025 11:05:26 -0800 Subject: [PATCH 5/6] Fix unawaited async calls --- orchestrator/graphql/schemas/process.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/orchestrator/graphql/schemas/process.py b/orchestrator/graphql/schemas/process.py index 5c3b7d165..c39ba0446 100644 --- a/orchestrator/graphql/schemas/process.py +++ b/orchestrator/graphql/schemas/process.py @@ -89,8 +89,8 @@ async def user_permissions(self, info: OrchestratorInfo) -> FormUserPermissionsT auth_resume, auth_retry = get_auth_callbacks(get_steps_to_evaluate_for_rbac(process), workflow) return FormUserPermissionsType( - retryAllowed=bool(auth_retry and auth_retry(oidc_user)), - resumeAllowed=bool(auth_resume and auth_resume(oidc_user)), + retryAllowed=bool(auth_retry and await auth_retry(oidc_user)), + resumeAllowed=bool(auth_resume and await auth_resume(oidc_user)), ) @authenticated_field(description="Returns list of subscriptions of the process") # type: ignore From 0a4ee3b0afee4900b8a20ad871c3ecf35a18be92 Mon Sep 17 00:00:00 2001 From: Ben Elam Date: Fri, 12 Dec 2025 12:47:56 -0800 Subject: [PATCH 6/6] Wrap app fixture to ensure RBAC cleanup --- test/unit_tests/api/test_processes.py | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/test/unit_tests/api/test_processes.py b/test/unit_tests/api/test_processes.py index dbec43b90..fffadd0b5 100644 --- a/test/unit_tests/api/test_processes.py +++ b/test/unit_tests/api/test_processes.py @@ -1090,7 +1090,20 @@ def test_unauthorized_retrystep_retry(test_client, process_on_unretriable_retrys assert HTTPStatus.FORBIDDEN == response.status_code -def test_internal_authorize_callback(test_client, fastapi_app): +@pytest.fixture +def fastapi_app_for_auth_callbacks(fastapi_app): + """Reset auth callbacks after each fixture use. + + Fixture fastapi_app has scope "session", so its cleanup won't run between tests. + This wraps the session fixture with a per-test fixture to ensure these callbacks are reset. + """ + yield fastapi_app + # Clear internal RBAC settings + fastapi_app.register_internal_authorize_callback(None) + fastapi_app.register_internal_retry_auth_callback(None) + + +def test_internal_authorize_callback(test_client, fastapi_app_for_auth_callbacks): """Test RBAC callbacks can restrict access to internal workflows.""" async def disallow(_: OIDCUserModel | None = None) -> bool: @@ -1105,7 +1118,7 @@ async def disallow(_: OIDCUserModel | None = None) -> bool: assert HTTPStatus.CREATED == response.status_code # Test that disallow now blocks us - fastapi_app.register_internal_authorize_callback(disallow) + fastapi_app_for_auth_callbacks.register_internal_authorize_callback(disallow) response = test_client.post("/api/processes/task_clean_up_tasks", json=[{}]) assert HTTPStatus.FORBIDDEN == response.status_code @@ -1134,7 +1147,7 @@ def internal_process_on_retry_step(): yield process_id -def test_internal_retry_auth_callback(test_client, fastapi_app, internal_process_on_retry_step): +def test_internal_retry_auth_callback(test_client, fastapi_app_for_auth_callbacks, internal_process_on_retry_step): """Test that RBAC callbacks can manage access to retrying internal workflows.""" async def disallow(_: OIDCUserModel | None = None) -> bool: @@ -1148,11 +1161,11 @@ async def allow(_: OIDCUserModel | None = None) -> bool: mock_start_process.return_value = uuid4() # Start with disallow. This should block us. - fastapi_app.register_internal_retry_auth_callback(disallow) + fastapi_app_for_auth_callbacks.register_internal_retry_auth_callback(disallow) response = test_client.put(f"/api/processes/{internal_process_on_retry_step}/resume", json={}) assert HTTPStatus.FORBIDDEN == response.status_code # Update to allow. This should succeed. - fastapi_app.register_internal_retry_auth_callback(allow) + fastapi_app_for_auth_callbacks.register_internal_retry_auth_callback(allow) response = test_client.put(f"/api/processes/{internal_process_on_retry_step}/resume", json={}) assert HTTPStatus.NO_CONTENT == response.status_code