Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions docs/reference-docs/auth-backend-and-frontend.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ WFO provides authentication based on an OIDC provider. The OIDC provider is pres

#### Frontend

The WFO frontend uses [NextAuth](3) to handle authentication. Authentication configuration can be found in [page/api/auth/[...nextauth].ts](4)
The WFO frontend uses [NextAuth][3] to handle authentication. Authentication configuration can be found in [page/api/auth/[...nextauth].ts][4]

**ENV variables**
These variables need to be set for authentication to work on the frontend.
Expand All @@ -56,7 +56,7 @@ NEXTAUTH_SECRET=[SECRET] // Used by NextAuth to encrypt the JWT token

With authentication turned on and these variables provided the frontend application will redirect unauthorized users to the login screen provided by the OIDC provider to request their credentials and return them to the page they tried to visit.

Note: It's possible to add additional oidc providers including some that are provided by the NextAuth library like Google, Apple and others. See [NextAuthProviders](5) for more information.
Note: It's possible to add additional oidc providers including some that are provided by the NextAuth library like Google, Apple and others. See [NextAuthProviders][5] for more information.

##### Authorization

Expand Down Expand Up @@ -219,7 +219,9 @@ class GraphqlAuthorization(ABC):

```

Graphql Authorization decisions can be made based on request properties and user attributes
Graphql Authorization decisions can be made based on request properties and user attributes.

[Additional methods](#authorization-for-internal-workflows) exist for defining role-based access control for internal workflows.

### Example

Expand Down Expand Up @@ -525,6 +527,24 @@ are prioritized in different workflow and inputstep configurations.
</tbody>
</table>

### Authorization for internal workflows
Users of Workflow Orchestrator can't directly access the `@workflow` decorators of tasks and workflows defined within `orchestrator-core`.
However, authorization callbacks can still be passed via the `OrchestratorCore` class when initializing your WFO application.

```python
from orchestrator import OrchestratorCore

app = OrchestratorCore()

# Let foo and bar be Authorizers
app.register_internal_authorize_callback(foo)
app.register_internal_retry_auth_callback(bar)
```

If these callbacks are not registered, these workflows can be started and retried by all users by default.

For more on application startup, see the [Settings Overview page][settings-overview].

### Examples
Assume we have the following function that can be used to create callbacks:

Expand Down Expand Up @@ -590,6 +610,8 @@ We can now construct a variety of authorization policies.

Note that we could specify `auth=allow_roles("user")` if helpful, or we can omit `auth` to fail open to any logged in user.

[settings-overview]: ../../reference-docs/app/settings-overview

[1]: https://github.com/workfloworchestrator/example-orchestrator-ui
[2]: https://github.com/workfloworchestrator/example-orchestrator
[3]: https://next-auth.js.org/
Expand Down
35 changes: 34 additions & 1 deletion orchestrator/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@
from orchestrator.log_config import LOGGER_OVERRIDES
from orchestrator.metrics import ORCHESTRATOR_METRICS_REGISTRY, initialize_default_metrics
from orchestrator.services.process_broadcast_thread import ProcessDataBroadcastThread
from orchestrator.settings import AppSettings, ExecutorType, app_settings
from orchestrator.settings import AppSettings, ExecutorType, app_settings, get_authorizers
from orchestrator.utils.auth import Authorizer
from orchestrator.version import GIT_COMMIT_HASH
from orchestrator.websocket import init_websocket_manager
from pydantic_forms.exception_handlers.fastapi import form_error_handler
Expand Down Expand Up @@ -311,6 +312,38 @@ def register_graphql_authorization(self, graphql_authorization_instance: Graphql
"""
self.auth_manager.graphql_authorization = graphql_authorization_instance

def register_internal_authorize_callback(self, callback: Authorizer) -> None:
"""Registers the authorize_callback for WFO's internal workflows and tasks.

Since RBAC policies are applied to workflows via decorator, this enables registration of callbacks
for workflows defined in orchestrator-core itself.
However, this assignment MUST be made before any workflows are run.

Args:
callback (Authorizer): The async Authorizer to run for the `authorize_callback` argument of internal workflows.

Returns:
None
"""
authorizers = get_authorizers()
authorizers.internal_authorize_callback = callback

def register_internal_retry_auth_callback(self, callback: Authorizer) -> None:
"""Registers the retry_auth_callback for WFO's internal workflows and tasks.

Since RBAC policies are applied to workflows via decorator, this enables registration of callbacks
for workflows defined in orchestrator-core itself.
However, this assignment MUST be made before any workflows are run.

Args:
callback (Authorizer): The async Authorizer to run for the `retry_auth_callback` argument of internal workflows.

Returns:
None
"""
authorizers = get_authorizers()
authorizers.internal_retry_auth_callback = callback


main_typer_app = typer.Typer()
main_typer_app.add_typer(cli_app, name="orchestrator", help="The orchestrator CLI commands")
Expand Down
4 changes: 2 additions & 2 deletions orchestrator/graphql/schemas/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ async def user_permissions(self, info: OrchestratorInfo) -> FormUserPermissionsT
auth_resume, auth_retry = get_auth_callbacks(get_steps_to_evaluate_for_rbac(process), workflow)

return FormUserPermissionsType(
retryAllowed=bool(auth_retry and auth_retry(oidc_user)),
resumeAllowed=bool(auth_resume and auth_resume(oidc_user)),
retryAllowed=bool(auth_retry and await auth_retry(oidc_user)),
resumeAllowed=bool(auth_resume and await auth_resume(oidc_user)),
Comment on lines +92 to +93
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See also #1294

)

@authenticated_field(description="Returns list of subscriptions of the process") # type: ignore
Expand Down
48 changes: 48 additions & 0 deletions orchestrator/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
from typing import Literal

from pydantic import Field, NonNegativeInt, PostgresDsn, RedisDsn
from pydantic.main import BaseModel
from pydantic_settings import BaseSettings

from oauth2_lib.fastapi import OIDCUserModel
from oauth2_lib.settings import oauth2lib_settings
from orchestrator.services.settings_env_variables import expose_settings
from orchestrator.utils.auth import Authorizer
from orchestrator.utils.expose_settings import SecretStr as OrchSecretStr
from pydantic_forms.types import strEnum

Expand Down Expand Up @@ -111,3 +114,48 @@ class AppSettings(BaseSettings):
expose_settings("app_settings", app_settings) # type: ignore
if app_settings.EXPOSE_OAUTH_SETTINGS:
expose_settings("oauth2lib_settings", oauth2lib_settings) # type: ignore


class Authorizers(BaseModel):
# Callbacks specifically for orchestrator-core callbacks.
# Separate from defaults for user-defined workflows and steps.
internal_authorize_callback: Authorizer | None = None
internal_retry_auth_callback: Authorizer | None = None

async def authorize_callback(self, user: OIDCUserModel | None) -> bool:
"""This is the authorize_callback to be registered for workflows defined within orchestrator-core.

If Authorizers.internal_authorize_callback is None, this function will return True.
i.e. any user will be authorized to start internal workflows.
"""
if self.internal_authorize_callback is None:
return True
return await self.internal_authorize_callback(user)

async def retry_auth_callback(self, user: OIDCUserModel | None) -> bool:
"""This is the retry_auth_callback to be registered for workflows defined within orchestrator-core.

If Authorizers.internal_retry_auth_callback is None, this function will return True.
i.e. any user will be authorized to retry internal workflows on failure.
"""
if self.internal_retry_auth_callback is None:
return True
return await self.internal_retry_auth_callback(user)


_authorizers = Authorizers()


def get_authorizers() -> Authorizers:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using this to reduce footgun opportunities since this is security-focused. On the other hand, it doesn't match how we handle app_settings, so I'm open to not going this route if folks have strong opinions about it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the this need to be registered from the app the same way we have app.register_authorization

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree that this doesn't belong to the settings class.

What I do think may cause confusion for end users is the AuthManager class which is defined in the oauth2_lib library:

class AuthManager:
    """Manages the authentication and authorization mechanisms for the application.

    This manager class orchestrates authentication and authorization, utilizing OpenID Connect (OIDC) and
    Open Policy Agent (OPA) respectively. It serves as a central hub for user authentication states and
    authorization policies. If defaults are insufficient, users can register alternatives or customize existing ones.
    ...."""

An instance of that class is created in the OrchestratorCore(FastAPI) class, and in downstream apps the authn/authz instances can be overridden like so:

    app.register_authentication(surf_authn)
    app.register_authorization(surf_authz)
    app.register_graphql_authorization(surf_graphql_authz)

The AuthManager's docstring suggests that it manages all authn/authz, but obviously this only holds for the API - the workflow authz is specifically for the Worker (which, in case of the threadpool executor, is running in the same process; but for the celery executor it will be separate).

I'm not suggesting we should make the Authorizers part of the AuthManager, maybe we can just name/document things well enough to avoid confusion. And perhaps tweak the AuthManager's docstring a little bit to say it's for the API.

Copy link
Contributor

@tjeerddie tjeerddie Nov 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

auth manager isn't specifically for the API, I would have used it in our version for the authorize_callback if we could use an async function. I'm also not saying that they need to be included in the AuthManager within oauth2-lib, since its orchestrator specific.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the workflow authz is specifically for the Worker

Just to clarify - the API endpoints for creating and resuming processes do check these callbacks, along with a couple other REST and GraphQL endpoints. I'm not actually aware of the worker running these checks; by checking in the request handler, we can prevent the workflow from ever making it to the queue if the process shouldn't be queued/re-queued.

I originally considered adding OrchestratorCore.register_internal_authorization_callbacks(authorize_callback=None, retry_auth_callback=None). I decided against it at the time since the process of checking this info seems kinda independent of the app from the perspective of building orch-core. Something-something loose coupling. That said, I can see it being more intuitive for the end user to configure it as part of the application.

Should I add it to OrchestratorCore class, or just continue with the current approach while making it clear that AuthManager is a separate thing in the docs?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I said it wrong, yes the API is the only one that does auth checks and the worker shouldn't because of the prevention in the API.
I would add the register_internal_authorization_callbacks and maybe split it to a separate register for both.

"""Acquire singleton of app authorizers to assign these callbacks at app setup.

Ensures downstream users can acquire singleton without being tempted to do
from orchestrator.settings import authorizers
authorizers = my_authorizers
or
from orchestrator import settings
settings.authorizers = my_authorizers

...each of which goes wrong in its own way.
"""
return _authorizers
11 changes: 10 additions & 1 deletion orchestrator/workflows/modify_note.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from orchestrator.db import db
from orchestrator.forms import SubmitFormPage
from orchestrator.services import subscriptions
from orchestrator.settings import get_authorizers
from orchestrator.targets import Target
from orchestrator.utils.json import to_serializable
from orchestrator.workflow import StepList, done, init, step, workflow
Expand All @@ -21,6 +22,8 @@
from pydantic_forms.types import FormGenerator, State, UUIDstr
from pydantic_forms.validators import LongText

authorizers = get_authorizers()


def initial_input_form(subscription_id: UUIDstr) -> FormGenerator:
subscription = subscriptions.get_subscription(subscription_id)
Expand Down Expand Up @@ -51,6 +54,12 @@ def store_subscription_note(subscription_id: UUIDstr, note: str) -> State:
}


@workflow("Modify Note", initial_input_form=wrap_modify_initial_input_form(initial_input_form), target=Target.MODIFY)
@workflow(
"Modify Note",
initial_input_form=wrap_modify_initial_input_form(initial_input_form),
target=Target.MODIFY,
authorize_callback=authorizers.authorize_callback,
retry_auth_callback=authorizers.retry_auth_callback,
)
def modify_note() -> StepList:
return init >> store_process_subscription() >> store_subscription_note >> done
9 changes: 8 additions & 1 deletion orchestrator/workflows/removed_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,18 @@
# limitations under the License.


from orchestrator.settings import get_authorizers
from orchestrator.workflow import StepList, workflow

authorizers = get_authorizers()


# This workflow has been made to create the initial import process for a SN7 subscription
# it does not do anything but is needed for the correct showing in the GUI.
@workflow("Dummy workflow to replace removed workflows")
@workflow(
"Dummy workflow to replace removed workflows",
authorize_callback=authorizers.authorize_callback,
retry_auth_callback=authorizers.retry_auth_callback,
)
def removed_workflow() -> StepList:
return StepList()
11 changes: 9 additions & 2 deletions orchestrator/workflows/tasks/cleanup_tasks_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
from sqlalchemy import select

from orchestrator.db import ProcessTable, db
from orchestrator.settings import app_settings
from orchestrator.settings import app_settings, get_authorizers
from orchestrator.targets import Target
from orchestrator.utils.datetime import nowtz
from orchestrator.workflow import ProcessStatus, StepList, done, init, step, workflow
from pydantic_forms.types import State

authorizers = get_authorizers()


@step("Clean up completed tasks older than TASK_LOG_RETENTION_DAYS")
def remove_tasks() -> State:
Expand All @@ -41,6 +43,11 @@ def remove_tasks() -> State:
return {"tasks_removed": count}


@workflow("Clean up old tasks", target=Target.SYSTEM)
@workflow(
"Clean up old tasks",
target=Target.SYSTEM,
authorize_callback=authorizers.authorize_callback,
retry_auth_callback=authorizers.retry_auth_callback,
)
def task_clean_up_tasks() -> StepList:
return init >> remove_tasks >> done
4 changes: 4 additions & 0 deletions orchestrator/workflows/tasks/resume_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

from orchestrator.db import ProcessTable, db
from orchestrator.services import processes
from orchestrator.settings import get_authorizers
from orchestrator.targets import Target
from orchestrator.workflow import ProcessStatus, StepList, done, init, step, workflow
from pydantic_forms.types import State, UUIDstr

authorizers = get_authorizers()
logger = structlog.get_logger(__name__)


Expand Down Expand Up @@ -110,6 +112,8 @@ def restart_created_workflows(created_state_process_ids: list[UUIDstr]) -> State
@workflow(
"Resume all workflows that are stuck on tasks with the status 'waiting', 'created' or 'resumed'",
target=Target.SYSTEM,
authorize_callback=authorizers.authorize_callback,
retry_auth_callback=authorizers.retry_auth_callback,
)
def task_resume_workflows() -> StepList:
return init >> find_waiting_workflows >> resume_found_workflows >> restart_created_workflows >> done
8 changes: 7 additions & 1 deletion orchestrator/workflows/tasks/validate_product_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
get_validation_product_workflows_for_subscription,
start_validation_workflow_for_workflows,
)
from orchestrator.settings import get_authorizers
from orchestrator.targets import Target
from orchestrator.workflow import StepList, done, init, step, workflow
from pydantic_forms.types import FormGenerator, State

authorizers = get_authorizers()
logger = structlog.get_logger(__name__)


Expand Down Expand Up @@ -86,7 +88,11 @@ def validate_product_type(product_type: str) -> State:


@workflow(
"Validate all subscriptions of Product Type", target=Target.SYSTEM, initial_input_form=initial_input_form_generator
"Validate all subscriptions of Product Type",
target=Target.SYSTEM,
initial_input_form=initial_input_form_generator,
authorize_callback=authorizers.authorize_callback,
retry_auth_callback=authorizers.retry_auth_callback,
)
def task_validate_product_type() -> StepList:
return init >> validate_product_type >> done
10 changes: 9 additions & 1 deletion orchestrator/workflows/tasks/validate_products.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@
from orchestrator.services.products import get_products
from orchestrator.services.translations import generate_translations
from orchestrator.services.workflows import get_workflow_by_name, get_workflows
from orchestrator.settings import get_authorizers
from orchestrator.targets import Target
from orchestrator.utils.errors import ProcessFailureError
from orchestrator.utils.fixed_inputs import fixed_input_configuration as fi_configuration
from orchestrator.workflow import StepList, done, init, step, workflow
from pydantic_forms.types import State

authorizers = get_authorizers()

# Since these errors are probably programming failures we should not throw AssertionErrors


Expand Down Expand Up @@ -187,7 +190,12 @@ def check_subscription_models() -> State:
return {"check_subscription_models": True}


@workflow("Validate products", target=Target.SYSTEM)
@workflow(
"Validate products",
target=Target.SYSTEM,
authorize_callback=authorizers.authorize_callback,
retry_auth_callback=authorizers.retry_auth_callback,
)
def task_validate_products() -> StepList:
return (
init
Expand Down
11 changes: 9 additions & 2 deletions orchestrator/workflows/tasks/validate_subscriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
get_validation_product_workflows_for_subscription,
start_validation_workflow_for_workflows,
)
from orchestrator.settings import app_settings
from orchestrator.settings import app_settings, get_authorizers
from orchestrator.targets import Target
from orchestrator.workflow import StepList, init, step, workflow

Expand All @@ -33,6 +33,8 @@

task_semaphore = BoundedSemaphore(value=2)

authorizers = get_authorizers()


@step("Validate subscriptions")
def validate_subscriptions() -> None:
Expand All @@ -56,6 +58,11 @@ def validate_subscriptions() -> None:
start_validation_workflow_for_workflows(subscription=subscription, workflows=validation_product_workflows)


@workflow("Validate subscriptions", target=Target.SYSTEM)
@workflow(
"Validate subscriptions",
target=Target.SYSTEM,
authorize_callback=authorizers.authorize_callback,
retry_auth_callback=authorizers.retry_auth_callback,
)
def task_validate_subscriptions() -> StepList:
return init >> validate_subscriptions
Loading