diff --git a/modelcontextprotocol/server.py b/modelcontextprotocol/server.py index 3155f8c..e63ae15 100644 --- a/modelcontextprotocol/server.py +++ b/modelcontextprotocol/server.py @@ -12,6 +12,9 @@ create_glossary_category_assets, create_glossary_assets, create_glossary_term_assets, + get_workflow_package_names, + get_workflows, + get_workflow_runs, create_data_domain_assets, create_data_product_assets, create_dq_rules, @@ -908,6 +911,253 @@ def create_glossary_categories(categories) -> List[Dict[str, Any]]: return create_glossary_category_assets(categories) +@mcp.tool() +def get_workflow_package_names_tool() -> List[str]: + """ + Get the values of all available workflow packages. + + This tool returns a list of all workflow package names that can be used + with other workflow tools. + + Returns: + List[str]: List of workflow package names (e.g., ['atlan-snowflake', 'atlan-bigquery', ...]) + + Examples: + # Get all workflow package names + packages = get_workflow_package_names_tool() + """ + return get_workflow_package_names() + + +@mcp.tool() +def get_workflows_tool( + id: str | None = None, + workflow_package_name: str | None = None, + max_results: int = 10, +) -> Dict[str, Any]: + """ + Retrieve workflows by ID or by package type. + + This tool supports two main use cases: + 1. Get a specific workflow by ID (returns full workflow details including steps and spec) + 2. Get multiple workflows by package type (returns metadata only, no full details) + + IMPORTANT: When getting by ID, this tool returns full workflow instructions with all steps and transformations. + When getting by type, this tool returns only basic metadata to avoid large responses. + + Note: This tool only returns workflows (templates), not workflow_runs. + To get workflow_run information, use get_workflow_runs_tool. + + When to use this tool: + - Use when you need to get a specific workflow by ID with full details (steps, spec, transformations) + - Use when you need to list multiple workflows by package type (metadata only) + - Use when you need to understand the full workflow execution flow (by ID only) + - Do NOT use if you need workflow_run (execution) information + + Args: + id (str, optional): The unique identifier (ID) of the workflow (e.g., 'atlan-snowflake-miner-1714638976'). + If provided, returns a single workflow with full details (workflow_steps and workflow_spec). + Either id or workflow_package_name must be provided, but not both. + workflow_package_name (str, optional): The workflow package name (e.g., 'atlan-snowflake', 'atlan-bigquery', 'SNOWFLAKE'). + Can be the full package name like 'atlan-snowflake' or the enum name like 'SNOWFLAKE'. + If provided, returns list of workflows with metadata only (no workflow_steps or workflow_spec). + Either id or workflow_package_name must be provided, but not both. + max_results (int, optional): Maximum number of workflows to return when getting by type. Defaults to 10. + Only used when workflow_package_name is provided. + + Returns: + Dict[str, Any]: Dictionary containing: + - workflows: List of workflow dictionaries. When getting by ID, contains single item. + Each workflow contains: + - template_name: Name of the template + - package_name, package_version: Package information + - source_system, source_category, workflow_type: Source information + - certified, verified: Certification status + - creator_email, creator_username: Creator information + - modifier_email, modifier_username: Last modifier information + - creation_timestamp: When template was created + - package_author, package_description, package_repository: Package metadata + - workflow_steps: Workflow steps (only when getting by ID) + - workflow_spec: Full workflow specification (only when getting by ID) + - total: Total count of workflows (1 when getting by ID, actual count when getting by type) + - error: None if no error occurred, otherwise the error message + + Examples: + # Get a specific workflow by ID (full details with steps and spec) + result = get_workflows_tool(id="atlan-snowflake-miner-1714638976") + workflow = result.get("workflows", [])[0] # Single workflow in list + # workflow contains complete workflow definition with workflow_steps and workflow_spec + + # Get Snowflake workflows by type (metadata only) + result = get_workflows_tool(workflow_package_name="atlan-snowflake") + workflows = result.get("workflows", []) # List of workflows + + # Get BigQuery workflows with custom limit + result = get_workflows_tool(workflow_package_name="atlan-bigquery", max_results=50) + + # Get workflows using enum name + result = get_workflows_tool(workflow_package_name="SNOWFLAKE") + """ + return get_workflows( + id=id, + workflow_package_name=workflow_package_name, + max_results=max_results, + ) + + +@mcp.tool() +def get_workflow_runs_tool( + workflow_name: str | None = None, + workflow_phase: str | None = None, + status: str + | None = None, # This is technically a list but we get the input as a string so we parse it into a list + started_at: str | None = None, + finished_at: str | None = None, + from_: int = 0, + size: int = 100, +) -> Dict[str, Any]: + """ + Retrieve workflow_runs with flexible filtering options. + + This tool supports two main use cases: + 1. Filter by specific workflow name and phase (single workflow) + 2. Filter across all workflows by status list and optional time ranges + + Each workflow_run contains execution details, timing, resource usage, and status information. + + IMPORTANT: This tool returns only metadata and execution status. It does NOT return full workflow + instructions, steps, or transformations. To get complete workflow details including all steps and + transformations, use get_workflows_tool with an id parameter instead. + + When to use this tool: + - Use when you need to find all workflow_runs of a specific workflow by execution phase + → Provide workflow_name AND workflow_phase (do NOT use status) + - Use when you need to find workflow_runs across multiple workflows filtered by status and time + → Provide status (and optionally started_at/finished_at), do NOT provide workflow_name or workflow_phase + - Use when monitoring workflow execution patterns or analyzing failures + - Use when you need execution metadata (status, timing, resource usage) for multiple workflow_runs + - Do NOT use if you need the complete workflow specification (use get_workflows_tool with id instead) + + Args: + workflow_name (str, optional): Name of the workflow (template) as displayed in the UI + (e.g., 'atlan-snowflake-miner-1714638976'). If provided, filters runs for that specific workflow. + This refers to the workflow name, not individual workflow_run IDs. + workflow_phase (str, optional): Phase of the workflow_run. Common values: 'Succeeded', 'Running', 'Failed', 'Error', 'Pending'. + Case-insensitive matching is supported. + REQUIRED when workflow_name is provided. DO NOT use status when workflow_name is provided. + status (List[str], optional): Array/list of workflow_run phases to filter by. + IMPORTANT: Must be an array/list type, NOT a JSON string. + Correct format: ["Succeeded", "Failed"] (array/list) + Incorrect format: '["Succeeded", "Failed"]' (string - will be rejected by schema validation) + Common values: + - 'Succeeded': Successfully completed workflow_runs + - 'Failed': Workflow_runs that failed + - 'Running': Currently executing workflow_runs + - 'Error': Workflow_runs that encountered errors + - 'Pending': Workflow_runs waiting to start + Case-insensitive matching is supported. + REQUIRED when workflow_name is NOT provided. DO NOT use status when workflow_name is provided - use workflow_phase instead. + Example: ["Succeeded", "Failed"] # Array/list, not a string + + Parameter Usage Rules: + - If workflow_name is provided: MUST use workflow_phase (single phase string). Do NOT use status. + - If workflow_name is NOT provided: MUST use status (list of phases). Do NOT use workflow_phase. + - These are mutually exclusive usage patterns - do not mix them. + started_at (str, optional): Lower bound on 'status.startedAt' timestamp. Accepts: + - Relative time format: 'now-2h', 'now-24h', 'now-7d', 'now-30d' + - ISO 8601 format: '2024-01-01T00:00:00Z' + Filters workflow_runs that started at or after this time. + Only used when workflow_name is not provided. + finished_at (str, optional): Lower bound on 'status.finishedAt' timestamp. Accepts: + - Relative time format: 'now-1h', 'now-12h' + - ISO 8601 format: '2024-01-01T00:00:00Z' + Filters workflow_runs that finished at or after this time. + Only used when workflow_name is not provided. + from_ (int, optional): Starting index of the search results for pagination. Defaults to 0. + size (int, optional): Maximum number of search results to return. Defaults to 100. + + Returns: + Dict[str, Any]: Dictionary containing: + - runs: List of workflow_run dictionaries. Each workflow_run contains: + Run Metadata: + - run_id: Unique identifier for this workflow_run + - run_phase: Execution phase (Succeeded, Running, Failed, etc.) + - run_started_at: When the workflow_run started (ISO timestamp) + - run_finished_at: When the workflow_run finished (ISO timestamp, None if still running) + - run_estimated_duration: Estimated execution duration + - run_progress: Progress indicator (e.g., "1/3") + - run_cpu_usage: CPU resource usage duration + - run_memory_usage: Memory resource usage duration + + Workflow Metadata: + - workflow_id: Reference to the workflow (template) used + - workflow_package_name: Package identifier + - workflow_cron_schedule: Cron schedule if scheduled + - workflow_cron_timezone: Timezone for cron schedule + - workflow_creator_id, workflow_creator_email, workflow_creator_username: Creator information + - workflow_modifier_id, workflow_modifier_email, workflow_modifier_username: Last modifier information + - workflow_creation_timestamp: When the workflow was created + - workflow_archiving_status: Archiving status + - total: Total count of workflow_runs matching the criteria (may be larger than the returned list if paginated) + - error: None if no error occurred, otherwise the error message + + Examples: + # CORRECT: Get succeeded workflow_runs for a specific workflow (use workflow_phase, NOT status) + result = get_workflow_runs_tool(workflow_name="atlan-snowflake-miner-1714638976", workflow_phase="Succeeded") + + # CORRECT: Get running workflow_runs for a specific workflow (use workflow_phase, NOT status) + result = get_workflow_runs_tool(workflow_name="atlan-snowflake-miner-1714638976", workflow_phase="Running") + + # CORRECT: Get failed workflow_runs with pagination (use workflow_phase, NOT status) + result = get_workflow_runs_tool(workflow_name="atlan-snowflake-miner-1714638976", workflow_phase="Failed", from_=0, size=50) + + # CORRECT: Get succeeded workflow_runs from the last 2 hours (across all workflows - use status, NOT workflow_phase) + result = get_workflow_runs_tool(status=["Succeeded"], started_at="now-2h") + + # CORRECT: Get failed workflow_runs from the last 24 hours (across all workflows - use status, NOT workflow_phase) + result = get_workflow_runs_tool(status=["Failed"], started_at="now-24h") + + # CORRECT: Get multiple statuses with both time filters (across all workflows - use status, NOT workflow_phase) + result = get_workflow_runs_tool( + status=["Succeeded", "Failed"], + started_at="now-7d", + finished_at="now-1h" + ) + + # CORRECT: Get running workflow_runs (across all workflows - use status, NOT workflow_phase) + result = get_workflow_runs_tool(status=["Running"]) + + # Analyze workflow_run durations + runs = result.get("runs", []) + for run in runs: + if run.get("run_finished_at") and run.get("run_started_at"): + print(f"Workflow run {run['run_id']} took {run.get('run_estimated_duration')}") + + # Analyze failure rates + failed_runs = [r for r in result.get("runs", []) if r.get("run_phase") == "Failed"] + print(f"Found {len(failed_runs)} failed workflow_runs in the time range") + """ + try: + # Parse list parameter if it comes as a JSON string (common with MCP clients) + status = parse_list_parameter(status) + except (json.JSONDecodeError, ValueError) as e: + return { + "runs": [], + "total": 0, + "error": f"Parameter parsing error for status: {str(e)}", + } + + return get_workflow_runs( + workflow_name=workflow_name, + workflow_phase=workflow_phase, + status=status, + started_at=started_at, + finished_at=finished_at, + from_=from_, + size=size, + ) + + @mcp.tool() def create_domains(domains) -> List[Dict[str, Any]]: """ diff --git a/modelcontextprotocol/tools/__init__.py b/modelcontextprotocol/tools/__init__.py index 198d289..fc395f7 100644 --- a/modelcontextprotocol/tools/__init__.py +++ b/modelcontextprotocol/tools/__init__.py @@ -21,6 +21,11 @@ DQRuleType, DQRuleSpecification, ) +from .workflows import ( + get_workflow_package_names, + get_workflows, + get_workflow_runs, +) __all__ = [ "search_assets", @@ -31,6 +36,9 @@ "create_glossary_category_assets", "create_glossary_assets", "create_glossary_term_assets", + "get_workflow_package_names", + "get_workflows", + "get_workflow_runs", "create_data_domain_assets", "create_data_product_assets", "CertificateStatus", diff --git a/modelcontextprotocol/tools/workflows.py b/modelcontextprotocol/tools/workflows.py new file mode 100644 index 0000000..eef2c5b --- /dev/null +++ b/modelcontextprotocol/tools/workflows.py @@ -0,0 +1,493 @@ +"""Tools for retrieving and managing workflows in Atlan. + +Naming Convention Note: +- "workflow" refers to a workflow template definition +- "workflow_run" refers to an executed instance/run +- The extract_workflow_metadata() function returns fields prefixed with "run_*" for execution data + and "workflow_*" for workflow-level metadata to clearly distinguish between the two. +""" + +import logging +from typing import Dict, Any, Optional, Union, List + +from client import get_atlan_client +from pyatlan.model.enums import WorkflowPackage, AtlanWorkflowPhase + +logger = logging.getLogger(__name__) + + +def _result_to_dict(result: Any, include_dag: bool = False) -> Optional[Dict[str, Any]]: + """ + Process a workflow or workflow_run object into a standardized dictionary format. + + This function handles both workflow templates and workflow runs, returning standardized metadata dictionaries. + + Args: + result: The workflow or workflow_run object from PyAtlan (WorkflowSearchResult or Workflow). + include_dag: If True, includes the workflow DAG in the output. This only applies if the input is a workflow template. + For workflow_run instances, this parameter has no effect. + Returns: + Optional[Dict[str, Any]]: Serialized workflow or workflow_run dictionary using Pydantic's dict() method. + Returns None for unsupported workflow types. Returns empty dict {} if result is None or conversion fails. + """ + if result is None: + return {} + + try: + dict_repr = result.dict(by_alias=True, exclude_unset=True) + except (AttributeError, TypeError) as e: + logger.warning(f"Failed to convert result to dict: {e}") + return {} + source = dict_repr.get("_source", None) + if source is None: + return dict_repr + + source_type = source.get("kind", None) + if source_type == "Workflow": + return extract_workflow_metadata(dict_repr) + elif source_type == "WorkflowTemplate": + return extract_workflow_template_metadata(dict_repr, include_dag=include_dag) + else: + logger.warning(f"Unsupported workflow type: {source_type}") + return None # Unsupported workflow type + + +def extract_workflow_template_metadata( + dict_repr: Dict[str, Any], include_dag: bool = False +) -> Dict[str, Any]: + """ + Extract useful metadata from a workflow template. + + This function extracts workflow metadata including package info, source system, certification status, etc. + + Args: + dict_repr: The workflow template object from the workflows array + include_dag: If True, includes the workflow DAG (workflow_steps and workflow_spec) in the output. + When False, returns only essential metadata fields. + + Returns: + Dictionary containing extracted workflow metadata with fields like: + - template_name, package_name, package_version + - source_system, source_category, workflow_type + - certified, verified flags + - creator/modifier information + - If include_dag=True: workflow_steps and workflow_spec + """ + source = dict_repr.get("_source") or {} + metadata = source.get("metadata") or {} + labels = metadata.get("labels") or {} + annotations = metadata.get("annotations") or {} + + base_output = { + "template_name": metadata.get("name"), + "package_name": annotations.get("package.argoproj.io/name"), + "package_version": labels.get("package.argoproj.io/version"), + "source_system": labels.get("orchestration.atlan.com/source"), + "source_category": labels.get("orchestration.atlan.com/sourceCategory"), + "workflow_type": labels.get("orchestration.atlan.com/type"), + "certified": labels.get("orchestration.atlan.com/certified") == "true", + "verified": labels.get("orchestration.atlan.com/verified") == "true", + "creator_email": labels.get("workflows.argoproj.io/creator-email"), + "creator_username": labels.get( + "workflows.argoproj.io/creator-preferred-username" + ), + "modifier_email": labels.get("workflows.argoproj.io/modifier-email"), + "modifier_username": labels.get( + "workflows.argoproj.io/modifier-preferred-username" + ), + "creation_timestamp": metadata.get("creationTimestamp"), + "package_author": annotations.get("package.argoproj.io/author"), + "package_description": annotations.get("package.argoproj.io/description"), + "package_repository": annotations.get("package.argoproj.io/repository"), + } + + if not include_dag: + return base_output + else: + return { + **base_output, + "workflow_steps": annotations.get("orchestration.atlan.com/workflow-steps"), + "workflow_spec": source.get("spec"), + } + + +def extract_workflow_metadata(dict_repr: Dict[str, Any]) -> Dict[str, Any]: + """ + Extract comprehensive metadata from a workflow_run (executed instance/run). + + The returned dictionary uses a two-tier naming convention: + - Fields prefixed with "run_*" contain execution-specific metadata (phase, timing, resource usage) + - Fields prefixed with "workflow_*" contain workflow-level metadata (template reference, package, creator) + + Args: + dict_repr: The workflow_run object dictionary representation + + Returns: + Dictionary containing comprehensive workflow_run metadata with fields: + Run Metadata (execution-specific): + - run_id: Unique identifier for this run + - run_phase: Execution phase (Succeeded, Running, Failed, etc.) + - run_started_at, run_finished_at: Execution timestamps + - run_estimated_duration, run_progress: Execution progress + - run_cpu_usage, run_memory_usage: Resource usage + + Workflow Metadata (workflow-level): + - workflow_id: Reference to the workflow (template) used + - workflow_package_name: Package identifier + - workflow_cron_schedule, workflow_cron_timezone: Scheduling info + - workflow_creator_*, workflow_modifier_*: Ownership information + - workflow_creation_timestamp, workflow_archiving_status: Lifecycle info + """ + source = dict_repr.get("_source") or {} + metadata = source.get("metadata") or {} + labels = metadata.get("labels") or {} + annotations = metadata.get("annotations") or {} + status = source.get("status") or {} + + return { + # Run Metadata + "run_id": dict_repr.get("_id") or metadata.get("name"), + "run_phase": status.get("phase"), + "run_started_at": status.get("startedat") or status.get("startedAt"), + "run_finished_at": status.get("finishedat") or status.get("finishedAt"), + "run_estimated_duration": status.get("estimatedDuration"), + "run_progress": status.get("progress"), + "run_cpu_usage": status.get("resourcesDuration", {}).get("cpu"), + "run_memory_usage": status.get("resourcesDuration", {}).get("memory"), + # Workflow Metadata + "workflow_id": labels.get("workflows.argoproj.io/workflow-template"), + # 'cron_workflow': labels.get('workflows.argoproj.io/cron-workflow'), + "workflow_package_name": annotations.get("package.argoproj.io/name"), + "workflow_cron_schedule": annotations.get("orchestration.atlan.com/schedule"), + "workflow_cron_timezone": annotations.get("orchestration.atlan.com/timezone"), + "workflow_creator_id": labels.get("workflows.argoproj.io/creator"), + "workflow_creator_email": labels.get("workflows.argoproj.io/creator-email"), + "workflow_creator_username": labels.get( + "workflows.argoproj.io/creator-preferred-username" + ), + "workflow_modifier_id": labels.get("workflows.argoproj.io/modifier"), + "workflow_modifier_email": labels.get("workflows.argoproj.io/modifier-email"), + "workflow_modifier_username": labels.get( + "workflows.argoproj.io/modifier-preferred-username" + ), + "workflow_creation_timestamp": metadata.get("creationTimestamp"), + "workflow_archiving_status": labels.get( + "workflows.argoproj.io/workflow-archiving-status" + ), + } + + +def get_workflow_package_names() -> List[str]: + """ + Get the values of all workflow packages. + """ + return [pkg.value for pkg in WorkflowPackage] + + +def get_workflows( + id: Optional[str] = None, + workflow_package_name: Optional[Union[WorkflowPackage, str]] = None, + max_results: int = 10, +) -> Dict[str, Any]: + """ + Retrieve workflows by ID or by package type. + + This function supports two main use cases: + 1. Get a specific workflow by ID (returns full workflow details including steps and spec) + 2. Get multiple workflows by package type (returns metadata only, no full details) + + Args: + id (str, optional): The unique identifier (ID) of the workflow (e.g., 'atlan-snowflake-miner-1714638976'). + If provided, returns a single workflow with full details (workflow_steps and workflow_spec). + workflow_package_name (Union[WorkflowPackage, str], optional): Workflow package type + (e.g., WorkflowPackage.SNOWFLAKE or "atlan-snowflake"). If provided, returns list of workflows + with metadata only (no workflow_steps or workflow_spec). + max_results (int, optional): Maximum number of workflows to return when getting by type. Defaults to 10. + Only used when workflow_package_name is provided. + + Returns: + Dict[str, Any]: Dictionary containing: + - workflows: List of workflow dictionaries. When getting by ID, contains single item. + - total: Total count of workflows (1 when getting by ID, actual count when getting by type) + - error: None if no error occurred, otherwise the error message + + Examples: + # Get a specific workflow by ID (full details) + result = get_workflows(id="atlan-snowflake-miner-1714638976") + # Returns: {"workflows": [{...full details with workflow_steps and workflow_spec...}], "total": 1, "error": None} + + # Get Snowflake workflows by type (metadata only) + from pyatlan.model.enums import WorkflowPackage + result = get_workflows(workflow_package_name=WorkflowPackage.SNOWFLAKE) + + # Get workflows with custom limit + result = get_workflows(workflow_package_name="atlan-bigquery", max_results=50) + """ + # Validate exactly one parameter is provided + if not id and not workflow_package_name: + raise ValueError("Either id or workflow_package_name must be provided") + if id and workflow_package_name: + raise ValueError("Cannot provide both id and workflow_package_name") + + try: + client = get_atlan_client() + + if id is not None: + # Get single workflow by ID - always include full details + logger.info(f"Retrieving workflow with ID: {id}") + + if not id: + raise ValueError("id cannot be empty") + + logger.debug(f"Calling client.workflow.find_by_id() with id={id}") + workflow = client.workflow.find_by_id(id=id) + + if workflow is None: + logger.warning(f"Workflow with ID '{id}' not found") + return { + "workflows": [], + "total": 0, + "error": f"Workflow with ID '{id}' not found", + } + + logger.info(f"Successfully retrieved workflow with ID: {id}") + # Always include full details (include_dag=True) when getting by ID + processed_workflow = _result_to_dict(workflow, include_dag=True) + + return { + "workflows": [processed_workflow] if processed_workflow else [], + "total": 1 if processed_workflow else 0, + "error": None, + } + + elif workflow_package_name is not None: + # Get workflows by type - always metadata only + logger.info( + f"Retrieving workflows with workflow_package_name={workflow_package_name}, max_results={max_results}" + ) + + logger.debug( + f"Calling client.workflow.find_by_type() with workflow_package_name={workflow_package_name}" + ) + results = client.workflow.find_by_type( + prefix=workflow_package_name, max_results=max_results + ) + + logger.info("Successfully retrieved workflows") + + # Process the response + workflows = results if results else [] + total = len(workflows) if workflows else 0 + + # Always metadata only (include_dag=False) when getting by type + processed_workflows = [ + _result_to_dict(workflow, include_dag=False) for workflow in workflows + ] + + return { + "workflows": processed_workflows, + "total": total, + "error": None, + } + + except Exception as e: + error_msg = f"Failed to retrieve workflows: {str(e)}" + logger.error(error_msg, exc_info=True) + return { + "workflows": [], + "total": 0, + "error": error_msg, + } + + +def get_workflow_runs( + workflow_name: Optional[str] = None, + workflow_phase: Optional[Union[AtlanWorkflowPhase, str]] = None, + status: Optional[Union[List[AtlanWorkflowPhase], List[str]]] = None, + started_at: Optional[str] = None, + finished_at: Optional[str] = None, + from_: int = 0, + size: int = 100, +) -> Dict[str, Any]: + """ + Retrieve workflow_runs with flexible filtering options. + + This function supports two main use cases: + 1. Filter by specific workflow name and phase (single workflow) + 2. Filter across all workflows by status list and optional time ranges + + Args: + workflow_name (str, optional): Name of the workflow (template) as displayed in the UI + (e.g., 'atlan-snowflake-miner-1714638976'). If provided, filters runs for that specific workflow. + workflow_phase (Union[AtlanWorkflowPhase, str], optional): Phase of the workflow_run + (e.g., Succeeded, Running, Failed). Required if workflow_name is provided and status is not. + status (Union[List[AtlanWorkflowPhase], List[str]], optional): List of workflow_run phases to filter by + (e.g., ['Succeeded', 'Failed']). Required if workflow_name is not provided. Can also be used + with workflow_name (will use first item from list as workflow_phase). + started_at (str, optional): Lower bound on 'status.startedAt' timestamp. Accepts: + - Relative time format: 'now-2h', 'now-24h', 'now-7d', 'now-30d' + - ISO 8601 format: '2024-01-01T00:00:00Z' + Only used when workflow_name is not provided. + finished_at (str, optional): Lower bound on 'status.finishedAt' timestamp. Accepts: + - Relative time format: 'now-1h', 'now-12h' + - ISO 8601 format: '2024-01-01T00:00:00Z' + Only used when workflow_name is not provided. + from_ (int, optional): Starting index of the search results. Defaults to 0. + size (int, optional): Maximum number of search results to return. Defaults to 100. + + Returns: + Dict[str, Any]: Dictionary containing: + - runs: List of workflow_runs with their details + - total: Total count of workflow_runs + - error: None if no error occurred, otherwise the error message + + Examples: + # Get succeeded workflow_runs for a specific workflow + from pyatlan.model.enums import AtlanWorkflowPhase + result = get_workflow_runs("atlan-snowflake-miner-1714638976", workflow_phase=AtlanWorkflowPhase.SUCCESS) + + # Get running workflow_runs for a specific workflow + result = get_workflow_runs("atlan-snowflake-miner-1714638976", workflow_phase="Running") + + # Get succeeded workflow_runs from the last 2 hours (across all workflows) + result = get_workflow_runs(status=["Succeeded"], started_at="now-2h") + + # Get failed workflow_runs with both time filters (across all workflows) + result = get_workflow_runs( + status=["Failed"], + started_at="now-24h", + finished_at="now-1h" + ) + + # Get multiple statuses (across all workflows) + result = get_workflow_runs(status=["Succeeded", "Failed"], started_at="now-7d") + """ + logger.info( + f"Retrieving workflow runs: workflow_name={workflow_name}, workflow_phase={workflow_phase}, " + f"status={status}, started_at={started_at}, finished_at={finished_at}, from_={from_}, size={size}" + ) + + try: + client = get_atlan_client() + + # Route based on whether workflow_name is provided + if workflow_name: + # Use get_runs() API - single workflow, single phase + if not workflow_phase and not status: + raise ValueError( + "Either workflow_phase or status must be provided when workflow_name is specified" + ) + + # Use workflow_phase if provided, otherwise use first item from status list + phase_to_use = workflow_phase + if not phase_to_use and status: + phase_to_use = ( + status[0] if isinstance(status, list) and len(status) > 0 else None + ) + + if not phase_to_use: + raise ValueError( + "workflow_phase or status must be provided when workflow_name is specified" + ) + + # Convert string to AtlanWorkflowPhase enum if needed + if isinstance(phase_to_use, str): + try: + phase_to_use = AtlanWorkflowPhase(phase_to_use) + except ValueError: + # Try case-insensitive match + for phase in AtlanWorkflowPhase: + if phase.value.upper() == phase_to_use.upper(): + phase_to_use = phase + break + else: + raise ValueError(f"Invalid workflow phase: {phase_to_use}") + + # Retrieve workflow runs using the pyatlan SDK + logger.debug( + f"Calling client.workflow.get_runs() with workflow_name={workflow_name}, workflow_phase={phase_to_use}" + ) + response = client.workflow.get_runs( + workflow_name=workflow_name, + workflow_phase=phase_to_use, + from_=from_, + size=size, + ) + + logger.info("Successfully retrieved workflow runs") + else: + # Use find_runs_by_status_and_time_range() API - all workflows, status list, time ranges + if not status: + raise ValueError( + "status must be provided when workflow_name is not specified" + ) + + # Convert string statuses to AtlanWorkflowPhase enums if needed + status_list = [] + for s in status: + if isinstance(s, str): + try: + status_list.append(AtlanWorkflowPhase(s)) + except ValueError: + # Try case-insensitive match + for phase in AtlanWorkflowPhase: + if phase.value.upper() == s.upper(): + status_list.append(phase) + break + else: + raise ValueError(f"Invalid workflow phase: {s}") + else: + status_list.append(s) + + # Retrieve workflow runs using the pyatlan SDK + logger.debug( + f"Calling client.workflow.find_runs_by_status_and_time_range() with " + f"status={status_list}, started_at={started_at}, finished_at={finished_at}" + ) + response = client.workflow.find_runs_by_status_and_time_range( + status=status_list, + started_at=started_at, + finished_at=finished_at, + from_=from_, + size=size, + ) + + logger.info("Successfully retrieved workflow runs by status and time range") + + # Process the response + runs = [] + total = 0 + if response and response.hits and response.hits.hits: + runs = response.hits.hits + # Handle total - it can be a dict with 'value' key or an object with .value attribute + total_obj = response.hits.total + if total_obj: + if isinstance(total_obj, dict): + total = total_obj.get("value", len(runs)) + elif hasattr(total_obj, "value"): + total = total_obj.value + else: + total = len(runs) + else: + total = len(runs) + + # Extract key run information using _result_to_dict for proper serialization + processed_runs = [] + for run in runs: + processed_run = _result_to_dict(run) + processed_runs.append(processed_run) + + return { + "runs": processed_runs, + "total": total, + "error": None, + } + + except Exception as e: + error_msg = f"Failed to retrieve workflow runs: {str(e)}" + logger.error(error_msg, exc_info=True) + return { + "runs": [], + "total": 0, + "error": error_msg, + }