From 5266e264f7f2461061d3106773826aa2be032833 Mon Sep 17 00:00:00 2001 From: Christopher Tin Date: Fri, 14 Nov 2025 10:53:23 -0800 Subject: [PATCH 01/11] Add workflow support to MCP server with LLM-optimized metadata extraction - Add new workflows.py module with 6 workflow retrieval functions - Implement two-tier field naming convention (run_* and workflow_* prefixes) - Register 6 new MCP tools with comprehensive LLM-friendly docstrings - Add metadata extraction functions for WorkflowTemplate and Workflow types - Include naming convention notes and usage guidance in all tool docstrings - Implement error handling and type safety improvements - Export all workflow functions in tools/__init__.py --- modelcontextprotocol/server.py | 431 ++++++++++++++++++ modelcontextprotocol/tools/__init__.py | 14 + modelcontextprotocol/tools/workflows.py | 580 ++++++++++++++++++++++++ 3 files changed, 1025 insertions(+) create mode 100644 modelcontextprotocol/tools/workflows.py diff --git a/modelcontextprotocol/server.py b/modelcontextprotocol/server.py index 32908593..7fbaf992 100644 --- a/modelcontextprotocol/server.py +++ b/modelcontextprotocol/server.py @@ -12,6 +12,12 @@ create_glossary_category_assets, create_glossary_assets, create_glossary_term_assets, + get_workflow_package_names, + get_workflows_by_type, + get_workflow_by_id, + get_workflow_runs, + get_workflow_run_by_id, + get_workflow_runs_by_status_and_time_range, UpdatableAttribute, CertificateStatus, UpdatableAsset, @@ -905,6 +911,431 @@ def create_glossary_categories(categories) -> List[Dict[str, Any]]: return create_glossary_category_assets(categories) +@mcp.tool() +def get_workflow_package_names_tool() -> List[str]: + """ + Get the values of all available workflow packages. + + This tool returns a list of all workflow package names that can be used + with other workflow tools. + + Returns: + List[str]: List of workflow package names (e.g., ['atlan-snowflake', 'atlan-bigquery', ...]) + + Examples: + # Get all workflow package names + packages = get_workflow_package_names_tool() + """ + return get_workflow_package_names() + + +@mcp.tool() +def get_workflows_by_type_tool(workflow_package_name: str, max_results: int = 10) -> Dict[str, Any]: + """ + Retrieve workflows by workflow package name. + + Naming Convention Note: The Argo/Kubernetes API uses terminology that can be confusing: + - "WorkflowTemplate" (kind="WorkflowTemplate") = Template definition (what users see as "Workflow" in UI) + - "Workflow" (kind="Workflow") = Executed instance/run (what users see as "WorkflowRun" in UI) + + This tool primarily retrieves executed workflow instances (Workflow kind) that have been run. + It may occasionally return WorkflowTemplate definitions if they are included in the search results. + + IMPORTANT: This tool returns only metadata and execution status. It does NOT return full workflow + instructions, steps, or transformations. To get complete workflow details including all steps and + transformations, use get_workflow_by_id_tool instead. + + When to use this tool: + - Use when you need to list multiple workflows by package type + - Use when you only need metadata (status, timing, resource usage) without full workflow details + - Do NOT use if you need the complete workflow specification with all steps and transformations + + The returned workflows can be of two types: + 1. Workflow (kind="Workflow"): An executed workflow instance/run with execution status, timing, and resource usage. + 2. WorkflowTemplate (kind="WorkflowTemplate"): A template definition (if templates are included in results). + + Args: + workflow_package_name (str): The workflow package name (e.g., 'atlan-snowflake', 'atlan-bigquery', 'SNOWFLAKE'). + Can be the full package name like 'atlan-snowflake' or the enum name like 'SNOWFLAKE'. + max_results (int, optional): Maximum number of workflows to return. Defaults to 10. + + Returns: + Dict[str, Any]: Dictionary containing: + - workflows: List of workflow dictionaries. Each workflow contains: + For Workflow type: + Run Metadata: + - run_id: Unique identifier for this workflow run + - run_phase: Execution phase (Succeeded, Running, Failed, etc.) + - run_started_at: When the workflow run started + - run_finished_at: When the workflow run finished + - run_estimated_duration: Estimated execution duration + - run_progress: Progress indicator + - run_cpu_usage: CPU resource usage + - run_memory_usage: Memory resource usage + + Workflow Metadata: + - workflow_id: Reference to the workflow template used + - workflow_package_name: Package identifier + - workflow_cron_schedule: Cron schedule if scheduled + - workflow_cron_timezone: Timezone for cron schedule + - workflow_creator_email, workflow_creator_username: Creator information + - workflow_modifier_email, workflow_modifier_username: Last modifier information + - workflow_creation_timestamp: When the workflow was created + - workflow_archiving_status: Archiving status + For WorkflowTemplate type: + - template_name: Name of the template + - package_name, package_version: Package information + - source_system, source_category, workflow_type: Source information + - certified, verified: Certification status + - creator_email, creator_username: Creator information + - modifier_email, modifier_username: Last modifier information + - creation_timestamp: When template was created + - package_author, package_description, package_repository: Package metadata + - total: Total count of workflows found + - error: None if no error occurred, otherwise the error message + + Examples: + # Get Snowflake workflows + result = get_workflows_by_type_tool("atlan-snowflake") + + # Get BigQuery workflows with custom limit + result = get_workflows_by_type_tool("atlan-bigquery", max_results=50) + + # Get workflows using enum name + result = get_workflows_by_type_tool("SNOWFLAKE") + """ + return get_workflows_by_type(workflow_package_name=workflow_package_name, max_results=max_results) + + +@mcp.tool() +def get_workflow_by_id_tool(id: str) -> Dict[str, Any]: + """ + Retrieve a specific workflow by its ID. + + Naming Convention Note: The Argo/Kubernetes API uses terminology that can be confusing: + - "WorkflowTemplate" (kind="WorkflowTemplate") = Template definition (what users see as "Workflow" in UI) + - "Workflow" (kind="Workflow") = Executed instance/run (what users see as "WorkflowRun" in UI) + + IMPORTANT: This is the ONLY tool that returns full workflow instructions with all steps and transformations. + All other workflow tools (get_workflows_by_type_tool, get_workflow_runs_tool, etc.) return only metadata + and execution status, but NOT the complete workflow definition, steps, or transformation details. + + This tool uses include_dag=True internally, which means it returns the complete workflow specification + including the workflow DAG (directed acyclic graph), all steps, transformations, and execution details. + + Note: Only workflows that have been run will be found (Workflow instances with kind="Workflow", not WorkflowTemplates). + + When to use this tool: + - Use when you need the COMPLETE workflow definition with all steps and transformations + - Use when you need to understand the full workflow execution flow + - Use when you need workflow_spec and workflow_steps data + - Do NOT use if you only need basic metadata (use other workflow tools instead) + + Args: + id (str): The unique identifier (ID) of the workflow (e.g., 'atlan-snowflake-miner-1714638976'). + + Returns: + Dict[str, Any]: Dictionary containing: + - workflow: The workflow dictionary with comprehensive metadata, or None if not found. + The workflow dictionary contains: + Run Metadata: + - run_id: Unique identifier for this workflow run + - run_phase: Execution phase (Succeeded, Running, Failed, etc.) + - run_started_at: When the workflow run started (ISO timestamp) + - run_finished_at: When the workflow run finished (ISO timestamp) + - run_estimated_duration: Estimated execution duration + - run_progress: Progress indicator (e.g., "1/3") + - run_cpu_usage: CPU resource usage duration + - run_memory_usage: Memory resource usage duration + + Workflow Metadata: + - workflow_id: Reference to the workflow template used + - workflow_package_name: Package identifier + - workflow_cron_schedule: Cron schedule if scheduled + - workflow_cron_timezone: Timezone for cron schedule + - workflow_creator_id, workflow_creator_email, workflow_creator_username: Creator information + - workflow_modifier_id, workflow_modifier_email, workflow_modifier_username: Last modifier information + - workflow_creation_timestamp: When the workflow was created + - workflow_archiving_status: Archiving status + + - Full workflow specification including all steps, transformations, and execution details + - error: None if no error occurred, otherwise the error message + + Examples: + # Get a specific workflow by ID to see full workflow instructions + result = get_workflow_by_id_tool("atlan-snowflake-miner-1714638976") + + # Check if workflow succeeded + if result.get("workflow", {}).get("run_phase") == "Succeeded": + print("Workflow completed successfully") + + # Access full workflow steps and transformations + workflow = result.get("workflow") + # The workflow object contains complete workflow definition + """ + return get_workflow_by_id(id=id) + + +@mcp.tool() +def get_workflow_runs_tool( + workflow_name: str, + workflow_phase: str, + from_: int = 0, + size: int = 100, +) -> Dict[str, Any]: + """ + Retrieve all workflow runs for a specific workflow and phase. + + Naming Convention Note: The Argo/Kubernetes API uses terminology that can be confusing: + - "WorkflowTemplate" (kind="WorkflowTemplate") = Template definition (what users see as "Workflow" in UI) + - "Workflow" (kind="Workflow") = Executed instance/run (what users see as "WorkflowRun" in UI) + + This tool returns workflow run instances (kind="Workflow") that match the specified workflow template name and phase. + Each run contains execution details, timing, resource usage, and status information. + + IMPORTANT: This tool returns only metadata and execution status. It does NOT return full workflow + instructions, steps, or transformations. To get complete workflow details including all steps and + transformations, use get_workflow_by_id_tool instead. + + When to use this tool: + - Use when you need to find all runs of a specific workflow template by execution phase + - Use when you need execution metadata (status, timing, resource usage) for multiple runs + - Use when you need to filter runs by phase (Succeeded, Failed, Running, etc.) + - Do NOT use if you need the complete workflow specification (use get_workflow_by_id_tool instead) + + Args: + workflow_name (str): Name of the workflow template as displayed in the UI (e.g., 'atlan-snowflake-miner-1714638976'). + This refers to the WorkflowTemplate name, not individual run IDs. + workflow_phase (str): Phase of the workflow. Common values: 'Succeeded', 'Running', 'Failed', 'Error', 'Pending'. + Case-insensitive matching is supported. + from_ (int, optional): Starting index of the search results for pagination. Defaults to 0. + size (int, optional): Maximum number of search results to return. Defaults to 100. + + Returns: + Dict[str, Any]: Dictionary containing: + - runs: List of workflow run dictionaries. Each run contains: + Run Metadata: + - run_id: Unique identifier for this workflow run + - run_phase: Execution phase (Succeeded, Running, Failed, etc.) + - run_started_at: When the workflow run started (ISO timestamp) + - run_finished_at: When the workflow run finished (ISO timestamp, None if still running) + - run_estimated_duration: Estimated execution duration + - run_progress: Progress indicator (e.g., "1/3") + - run_cpu_usage: CPU resource usage duration + - run_memory_usage: Memory resource usage duration + + Workflow Metadata: + - workflow_id: Reference to the workflow template used + - workflow_package_name: Package identifier + - workflow_cron_schedule: Cron schedule if scheduled + - workflow_cron_timezone: Timezone for cron schedule + - workflow_creator_id, workflow_creator_email, workflow_creator_username: Creator information + - workflow_modifier_id, workflow_modifier_email, workflow_modifier_username: Last modifier information + - workflow_creation_timestamp: When the workflow was created + - workflow_archiving_status: Archiving status + - total: Total count of runs matching the criteria + - error: None if no error occurred, otherwise the error message + + Examples: + # Get succeeded workflow runs + result = get_workflow_runs_tool("atlan-snowflake-miner-1714638976", "Succeeded") + + # Get running workflows + result = get_workflow_runs_tool("atlan-snowflake-miner-1714638976", "Running") + + # Get failed workflows with pagination + result = get_workflow_runs_tool("atlan-snowflake-miner-1714638976", "Failed", from_=0, size=50) + + # Analyze run durations + runs = result.get("runs", []) + for run in runs: + if run.get("run_finished_at") and run.get("run_started_at"): + print(f"Run {run['run_id']} took {run.get('run_estimated_duration')}") + """ + return get_workflow_runs( + workflow_name=workflow_name, + workflow_phase=workflow_phase, + from_=from_, + size=size, + ) + + +@mcp.tool() +def get_workflow_run_by_id_tool(id: str) -> Dict[str, Any]: + """ + Find a workflow run by its ID. + + Naming Convention Note: The Argo/Kubernetes API uses terminology that can be confusing: + - "WorkflowTemplate" (kind="WorkflowTemplate") = Template definition (what users see as "Workflow" in UI) + - "Workflow" (kind="Workflow") = Executed instance/run (what users see as "WorkflowRun" in UI) + + This tool retrieves a specific workflow run instance (kind="Workflow") by its unique run ID. + Note: Only workflow runs (executed instances) will be found, not templates. + + IMPORTANT: This tool returns only metadata and execution status. It does NOT return full workflow + instructions, steps, or transformations. To get complete workflow details including all steps and + transformations, use get_workflow_by_id_tool instead. + + When to use this tool: + - Use when you have a specific workflow run ID and need its execution details + - Use when you need metadata for a single run (status, timing, resource usage) + - Do NOT use if you need the complete workflow specification (use get_workflow_by_id_tool instead) + + Args: + id (str): The ID of the workflow run to find (e.g., 'atlan-snowflake-miner-1714638976-t7s8b'). + Workflow run IDs typically include the workflow name followed by a unique suffix. + + Returns: + Dict[str, Any]: Dictionary containing: + - run: The workflow run dictionary with comprehensive execution details, or None if not found. + The run dictionary contains: + Run Metadata: + - run_id: Unique identifier for this workflow run + - run_phase: Execution phase (Succeeded, Running, Failed, Error, etc.) + - run_started_at: When the workflow run started (ISO timestamp) + - run_finished_at: When the workflow run finished (ISO timestamp, None if still running) + - run_estimated_duration: Estimated execution duration + - run_progress: Progress indicator showing completed steps (e.g., "1/3") + - run_cpu_usage: CPU resource usage duration + - run_memory_usage: Memory resource usage duration + + Workflow Metadata: + - workflow_id: Reference to the workflow template that was used + - workflow_package_name: Package identifier + - workflow_cron_schedule: Cron schedule if scheduled + - workflow_cron_timezone: Timezone for cron schedule + - workflow_creator_id, workflow_creator_email, workflow_creator_username: Creator information + - workflow_modifier_id, workflow_modifier_email, workflow_modifier_username: Last modifier information + - workflow_creation_timestamp: When the workflow was created + - workflow_archiving_status: Whether the workflow is archived + - error: None if no error occurred, otherwise the error message + + Examples: + # Get a specific workflow run + result = get_workflow_run_by_id_tool("atlan-snowflake-miner-1714638976-t7s8b") + + # Check run status + run = result.get("run") + if run: + print(f"Status: {run.get('run_phase')}") + print(f"Duration: {run.get('run_estimated_duration')}") + if run.get('run_phase') == 'Failed': + print(f"Run ID: {run.get('run_id')}") + """ + return get_workflow_run_by_id(id=id) + + +@mcp.tool() +def get_workflow_runs_by_status_and_time_range_tool( + status: List[str], + started_at: str | None = None, + finished_at: str | None = None, + from_: int = 0, + size: int = 100, +) -> Dict[str, Any]: + """ + Retrieve workflow runs based on their status and time range. + + Naming Convention Note: The Argo/Kubernetes API uses terminology that can be confusing: + - "WorkflowTemplate" (kind="WorkflowTemplate") = Template definition (what users see as "Workflow" in UI) + - "Workflow" (kind="Workflow") = Executed instance/run (what users see as "WorkflowRun" in UI) + + This tool allows you to search for workflow run instances (kind="Workflow") by filtering on their execution status + (e.g., Succeeded, Failed, Running) and optionally by time ranges. This is useful for + monitoring, debugging, and analyzing workflow execution patterns. + + IMPORTANT: This tool returns only metadata and execution status. It does NOT return full workflow + instructions, steps, or transformations. To get complete workflow details including all steps and + transformations, use get_workflow_by_id_tool instead. + + When to use this tool: + - Use when you need to find runs across multiple workflows filtered by status and time + - Use when monitoring workflow execution patterns or analyzing failures + - Use when you need execution metadata for runs within a specific time window + - Do NOT use if you need the complete workflow specification (use get_workflow_by_id_tool instead) + + Args: + status (List[str]): List of workflow phases to filter by. Common values: + - 'Succeeded': Successfully completed workflows + - 'Failed': Workflows that failed + - 'Running': Currently executing workflows + - 'Error': Workflows that encountered errors + - 'Pending': Workflows waiting to start + Case-insensitive matching is supported. Example: ['Succeeded', 'Failed'] + started_at (str, optional): Lower bound on 'status.startedAt' timestamp. Accepts: + - Relative time format: 'now-2h', 'now-24h', 'now-7d', 'now-30d' + - ISO 8601 format: '2024-01-01T00:00:00Z' + Filters workflows that started at or after this time. + finished_at (str, optional): Lower bound on 'status.finishedAt' timestamp. Accepts: + - Relative time format: 'now-1h', 'now-12h' + - ISO 8601 format: '2024-01-01T00:00:00Z' + Filters workflows that finished at or after this time. + from_ (int, optional): Starting index of the search results for pagination. Defaults to 0. + size (int, optional): Maximum number of search results to return. Defaults to 100. + + Returns: + Dict[str, Any]: Dictionary containing: + - runs: List of workflow run dictionaries matching the criteria. Each run contains: + Run Metadata: + - run_id: Unique identifier for this workflow run + - run_phase: Execution phase (Succeeded, Running, Failed, etc.) + - run_started_at: When the workflow run started (ISO timestamp) + - run_finished_at: When the workflow run finished (ISO timestamp, None if still running) + - run_estimated_duration: Estimated execution duration + - run_progress: Progress indicator (e.g., "1/3") + - run_cpu_usage: CPU resource usage duration + - run_memory_usage: Memory resource usage duration + + Workflow Metadata: + - workflow_id: Reference to the workflow template used + - workflow_package_name: Package identifier + - workflow_cron_schedule: Cron schedule if scheduled + - workflow_cron_timezone: Timezone for cron schedule + - workflow_creator_id, workflow_creator_email, workflow_creator_username: Creator information + - workflow_modifier_id, workflow_modifier_email, workflow_modifier_username: Last modifier information + - workflow_creation_timestamp: When the workflow was created + - workflow_archiving_status: Archiving status + - total: Total count of runs matching the criteria (may be larger than the returned list if paginated) + - error: None if no error occurred, otherwise the error message + + Examples: + # Get succeeded runs from the last 2 hours + result = get_workflow_runs_by_status_and_time_range_tool( + status=["Succeeded"], + started_at="now-2h" + ) + + # Get failed runs from the last 24 hours + result = get_workflow_runs_by_status_and_time_range_tool( + status=["Failed"], + started_at="now-24h" + ) + + # Get multiple statuses with both time filters + result = get_workflow_runs_by_status_and_time_range_tool( + status=["Succeeded", "Failed"], + started_at="now-7d", + finished_at="now-1h" + ) + + # Get running workflows + result = get_workflow_runs_by_status_and_time_range_tool( + status=["Running"] + ) + + # Analyze failure rates + failed_runs = [r for r in result.get("runs", []) if r.get("run_phase") == "Failed"] + print(f"Found {len(failed_runs)} failed runs in the time range") + """ + return get_workflow_runs_by_status_and_time_range( + status=status, + started_at=started_at, + finished_at=finished_at, + from_=from_, + size=size, + ) + + def main(): """Main entry point for the Atlan MCP Server.""" diff --git a/modelcontextprotocol/tools/__init__.py b/modelcontextprotocol/tools/__init__.py index f9d18f67..bc7c3533 100644 --- a/modelcontextprotocol/tools/__init__.py +++ b/modelcontextprotocol/tools/__init__.py @@ -17,6 +17,14 @@ GlossaryCategory, GlossaryTerm, ) +from .workflows import ( + get_workflow_package_names, + get_workflows_by_type, + get_workflow_by_id, + get_workflow_runs, + get_workflow_run_by_id, + get_workflow_runs_by_status_and_time_range, +) __all__ = [ "search_assets", @@ -27,6 +35,12 @@ "create_glossary_category_assets", "create_glossary_assets", "create_glossary_term_assets", + "get_workflow_package_names", + "get_workflows_by_type", + "get_workflow_by_id", + "get_workflow_runs", + "get_workflow_run_by_id", + "get_workflow_runs_by_status_and_time_range", "CertificateStatus", "UpdatableAttribute", "UpdatableAsset", diff --git a/modelcontextprotocol/tools/workflows.py b/modelcontextprotocol/tools/workflows.py new file mode 100644 index 00000000..4aa9de2c --- /dev/null +++ b/modelcontextprotocol/tools/workflows.py @@ -0,0 +1,580 @@ +"""Tools for retrieving and managing workflows in Atlan. + +Naming Convention Note: +The Argo/Kubernetes API uses terminology that can be confusing: +- "WorkflowTemplate" (kind="WorkflowTemplate") = Template definition (what users see as "Workflow" in UI) +- "Workflow" (kind="Workflow") = Executed instance/run (what users see as "WorkflowRun" in UI) + +Throughout this module: +- Functions dealing with "WorkflowTemplate" handle template definitions +- Functions dealing with "Workflow" handle executed instances/runs +- The extract_workflow_metadata() function returns fields prefixed with "run_*" for execution data + and "workflow_*" for workflow-level metadata to clearly distinguish between the two. +""" + +import logging +from typing import Dict, Any, Optional, Union, List + +from client import get_atlan_client +from pyatlan.model.enums import WorkflowPackage, AtlanWorkflowPhase +from pyatlan.model.workflow import Workflow, WorkflowSchedule + +# from dotenv import load_dotenv + +# load_dotenv(dotenv_path="/Users/christopher.tin/Repos/.env") + +logger = logging.getLogger(__name__) + + +def _result_to_dict(result: Any, include_dag: bool = False) -> Optional[Dict[str, Any]]: + """ + Process a workflow object into a standardized dictionary format. + + Note: There is a naming confusion in the Argo/Kubernetes API: + - "WorkflowTemplate" (kind="WorkflowTemplate") = Template definition (what we call "Workflow" in UI) + - "Workflow" (kind="Workflow") = Executed instance/run (what we call "WorkflowRun" in UI) + + This function handles both types and returns standardized metadata dictionaries. + + Args: + result: The workflow object from PyAtlan (WorkflowSearchResult or Workflow). + include_dag: If True, includes the workflow DAG in the output. This only applies if the input is a WorkflowTemplate. + For Workflow instances (runs), this parameter has no effect. + Returns: + Optional[Dict[str, Any]]: Serialized workflow dictionary using Pydantic's dict() method. + Returns None for unsupported workflow types. Returns empty dict {} if result is None or conversion fails. + """ + if result is None: + return {} + + try: + dict_repr = result.dict(by_alias=True, exclude_unset=True) + except (AttributeError, TypeError) as e: + logger.warning(f"Failed to convert result to dict: {e}") + return {} + source = dict_repr.get("_source", None) + if source is None: + return dict_repr + + source_type = source.get("kind", None) + if source_type == "Workflow": + return extract_workflow_metadata(dict_repr) + elif source_type == "WorkflowTemplate": + return extract_workflow_template_metadata(dict_repr, include_dag=include_dag) + else: + logger.warning(f"Unsupported workflow type: {source_type}") + return None # Unsupported workflow type + + +def extract_workflow_template_metadata(dict_repr: Dict[str, Any], include_dag: bool = False) -> Dict[str, Any]: + """ + Extract useful metadata from an Argo WorkflowTemplate. + + Note: In Argo terminology, "WorkflowTemplate" is the template definition (what users see as "Workflow" in the UI). + This function extracts template metadata including package info, source system, certification status, etc. + + Args: + dict_repr: The workflow template object from the workflows array (kind="WorkflowTemplate") + include_dag: If True, includes the workflow DAG (workflow_steps and workflow_spec) in the output. + When False, returns only essential metadata fields. + + Returns: + Dictionary containing extracted template metadata with fields like: + - template_name, package_name, package_version + - source_system, source_category, workflow_type + - certified, verified flags + - creator/modifier information + - If include_dag=True: workflow_steps and workflow_spec + """ + source = dict_repr.get("_source") or {} + metadata = source.get('metadata') or {} + labels = metadata.get('labels') or {} + annotations = metadata.get('annotations') or {} + + base_output = { + 'template_name': metadata.get('name'), + 'package_name': annotations.get('package.argoproj.io/name'), + 'package_version': labels.get('package.argoproj.io/version'), + 'source_system': labels.get('orchestration.atlan.com/source'), + 'source_category': labels.get('orchestration.atlan.com/sourceCategory'), + 'workflow_type': labels.get('orchestration.atlan.com/type'), + 'certified': labels.get('orchestration.atlan.com/certified') == 'true', + 'verified': labels.get('orchestration.atlan.com/verified') == 'true', + 'creator_email': labels.get('workflows.argoproj.io/creator-email'), + 'creator_username': labels.get('workflows.argoproj.io/creator-preferred-username'), + 'modifier_email': labels.get('workflows.argoproj.io/modifier-email'), + 'modifier_username': labels.get('workflows.argoproj.io/modifier-preferred-username'), + 'creation_timestamp': metadata.get('creationTimestamp'), + 'package_author': annotations.get('package.argoproj.io/author'), + 'package_description': annotations.get('package.argoproj.io/description'), + 'package_repository': annotations.get('package.argoproj.io/repository') + } + + if not include_dag: + return base_output + else: + return { + **base_output, + 'workflow_steps': annotations.get('orchestration.atlan.com/workflow-steps'), + 'workflow_spec': source.get('spec') + } + +def extract_workflow_metadata(dict_repr: Dict[str, Any]) -> Dict[str, Any]: + """ + Extract comprehensive metadata from a Workflow (executed instance/run). + + Note: In Argo terminology, "Workflow" (kind="Workflow") is an executed instance/run + (what users see as "WorkflowRun" in the UI). This is different from "WorkflowTemplate". + + The returned dictionary uses a two-tier naming convention: + - Fields prefixed with "run_*" contain execution-specific metadata (phase, timing, resource usage) + - Fields prefixed with "workflow_*" contain workflow-level metadata (template reference, package, creator) + + Args: + dict_repr: The workflow object dictionary representation (kind="Workflow") + + Returns: + Dictionary containing comprehensive workflow run metadata with fields: + Run Metadata (execution-specific): + - run_id: Unique identifier for this run + - run_phase: Execution phase (Succeeded, Running, Failed, etc.) + - run_started_at, run_finished_at: Execution timestamps + - run_estimated_duration, run_progress: Execution progress + - run_cpu_usage, run_memory_usage: Resource usage + + Workflow Metadata (workflow-level): + - workflow_id: Reference to the workflow template used + - workflow_package_name: Package identifier + - workflow_cron_schedule, workflow_cron_timezone: Scheduling info + - workflow_creator_*, workflow_modifier_*: Ownership information + - workflow_creation_timestamp, workflow_archiving_status: Lifecycle info + """ + source = dict_repr.get("_source") or {} + metadata = source.get('metadata') or {} + labels = metadata.get('labels') or {} + annotations = metadata.get('annotations') or {} + status = source.get('status') or {} + + return { + # Run Metadata + 'run_id': dict_repr.get('_id') or metadata.get('name'), + 'run_phase': status.get('phase'), + 'run_started_at': status.get('startedat') or status.get('startedAt'), + 'run_finished_at': status.get('finishedat') or status.get('finishedAt'), + 'run_estimated_duration': status.get('estimatedDuration'), + 'run_progress': status.get('progress'), + 'run_cpu_usage': status.get('resourcesDuration', {}).get('cpu'), + 'run_memory_usage': status.get('resourcesDuration', {}).get('memory'), + + # Workflow Metadata + 'workflow_id': labels.get('workflows.argoproj.io/workflow-template'), + # 'cron_workflow': labels.get('workflows.argoproj.io/cron-workflow'), + 'workflow_package_name': annotations.get('package.argoproj.io/name'), + 'workflow_cron_schedule': annotations.get('orchestration.atlan.com/schedule'), + 'workflow_cron_timezone': annotations.get('orchestration.atlan.com/timezone'), + 'workflow_creator_id': labels.get('workflows.argoproj.io/creator'), + 'workflow_creator_email': labels.get('workflows.argoproj.io/creator-email'), + 'workflow_creator_username': labels.get('workflows.argoproj.io/creator-preferred-username'), + 'workflow_modifier_id': labels.get('workflows.argoproj.io/modifier'), + 'workflow_modifier_email': labels.get('workflows.argoproj.io/modifier-email'), + 'workflow_modifier_username': labels.get('workflows.argoproj.io/modifier-preferred-username'), + 'workflow_creation_timestamp': metadata.get('creationTimestamp'), + 'workflow_archiving_status': labels.get('workflows.argoproj.io/workflow-archiving-status') + } + + +def get_workflow_package_names() -> List[str]: + """ + Get the values of all workflow packages. + """ + return [pkg.value for pkg in WorkflowPackage] + +def get_workflows_by_type(workflow_package_name: Union[WorkflowPackage, str], max_results: int = 10) -> Dict[str, Any]: + """ + Retrieve workflows by type (workflow package name). Note: Only workflows that have been run will be found. + + Args: + workflow_package_name (Union[WorkflowPackage, str]): Workflow package type (e.g., WorkflowPackage.SNOWFLAKE or "atlan-snowflake"). + max_results (int, optional): Maximum number of workflows to return. Defaults to 10. + + Returns: + Dict[str, Any]: Dictionary containing: + - workflows: List of workflows with their configurations + - total: Total count of workflows + - error: None if no error occurred, otherwise the error message + + Examples: + # Get Snowflake workflows + from pyatlan.model.enums import WorkflowPackage + result = get_workflows_by_type(WorkflowPackage.SNOWFLAKE) + + # Get workflows with custom limit + result = get_workflows_by_type(WorkflowPackage.BIGQUERY, max_results=50) + """ + logger.info(f"Starting workflow retrieval with workflow_package_name={workflow_package_name}, max_results={max_results}") + + try: + client = get_atlan_client() + + # Retrieve workflows using the pyatlan SDK + logger.debug(f"Calling client.workflow.find_by_type() with workflow_package_name={workflow_package_name}") + results = client.workflow.find_by_type(prefix=workflow_package_name, max_results=max_results) + + logger.info(f"Successfully retrieved workflows") + + # Process the response + workflows = results if results else [] + total = len(workflows) if workflows else 0 + + # Extract key workflow information + processed_workflows = [_result_to_dict(workflow) for workflow in workflows] + + return { + "workflows": processed_workflows, + "total": total, + "error": None, + } + + except Exception as e: + error_msg = f"Failed to retrieve workflows: {str(e)}" + logger.error(error_msg, exc_info=True) + return { + "workflows": [], + "total": 0, + "error": error_msg, + } + + +def get_workflow_by_id(id: str) -> Dict[str, Any]: + """ + Retrieve a specific workflow by its ID. Note: Only workflows that have been run will be found. + + Args: + id (str): The unique identifier (ID) of the workflow (e.g., 'atlan-snowflake-miner-1714638976'). + + Returns: + Dict[str, Any]: Dictionary containing: + - workflow: The workflow object with its configuration, or None if not found + - error: None if no error occurred, otherwise the error message + + Examples: + # Get a specific workflow by ID + result = get_workflow_by_id("atlan-snowflake-miner-1714638976") + """ + logger.info(f"Retrieving workflow with ID: {id}") + + try: + if not id: + raise ValueError("id cannot be empty") + + client = get_atlan_client() + + # Retrieve workflow using the pyatlan SDK + logger.debug(f"Calling client.workflow.find_by_id() with id={id}") + workflow = client.workflow.find_by_id(id=id) + + if workflow is None: + logger.warning(f"Workflow with ID '{id}' not found") + return { + "workflow": None, + "error": f"Workflow with ID '{id}' not found", + } + + logger.info(f"Successfully retrieved workflow with ID: {id}") + processed_workflow = _result_to_dict(workflow, include_dag=True) + + return { + "workflow": processed_workflow, + "error": None, + } + + except Exception as e: + error_msg = f"Failed to retrieve workflow with ID '{id}': {str(e)}" + logger.error(error_msg, exc_info=True) + return { + "workflow": None, + "error": error_msg, + } + + +def get_workflow_runs( + workflow_name: str, + workflow_phase: Union[AtlanWorkflowPhase, str], + from_: int = 0, + size: int = 100, +) -> Dict[str, Any]: + """ + Retrieve all workflow runs for a specific workflow and phase. + + Args: + workflow_name (str): Name of the workflow as displayed in the UI (e.g., 'atlan-snowflake-miner-1714638976'). + workflow_phase (Union[AtlanWorkflowPhase, str]): Phase of the workflow (e.g., Succeeded, Running, Failed). + from_ (int, optional): Starting index of the search results. Defaults to 0. + size (int, optional): Maximum number of search results to return. Defaults to 100. + + Returns: + Dict[str, Any]: Dictionary containing: + - runs: List of workflow runs with their details + - total: Total count of runs + - error: None if no error occurred, otherwise the error message + + Examples: + # Get succeeded workflow runs + from pyatlan.model.enums import AtlanWorkflowPhase + result = get_workflow_runs("atlan-snowflake-miner-1714638976", AtlanWorkflowPhase.SUCCESS) + + # Get running workflows + result = get_workflow_runs("atlan-snowflake-miner-1714638976", "Running") + """ + logger.info(f"Retrieving workflow runs: workflow_name={workflow_name}, phase={workflow_phase}, from_={from_}, size={size}") + + try: + if not workflow_name: + raise ValueError("workflow_name cannot be empty") + if not workflow_phase: + raise ValueError("workflow_phase cannot be empty") + + # Convert string to AtlanWorkflowPhase enum if needed + if isinstance(workflow_phase, str): + try: + workflow_phase = AtlanWorkflowPhase(workflow_phase) + except ValueError: + # Try case-insensitive match + for phase in AtlanWorkflowPhase: + if phase.value.upper() == workflow_phase.upper(): + workflow_phase = phase + break + else: + raise ValueError(f"Invalid workflow phase: {workflow_phase}") + + client = get_atlan_client() + + # Retrieve workflow runs using the pyatlan SDK + logger.debug(f"Calling client.workflow.get_runs() with workflow_name={workflow_name}, workflow_phase={workflow_phase}") + response = client.workflow.get_runs( + workflow_name=workflow_name, + workflow_phase=workflow_phase, + from_=from_, + size=size, + ) + + logger.info("Successfully retrieved workflow runs") + + # Process the response + runs = [] + total = 0 + if response and response.hits and response.hits.hits: + runs = response.hits.hits + # Handle total - it can be a dict with 'value' key or an object with .value attribute + total_obj = response.hits.total + if total_obj: + if isinstance(total_obj, dict): + total = total_obj.get("value", len(runs)) + elif hasattr(total_obj, "value"): + total = total_obj.value + else: + total = len(runs) + else: + total = len(runs) + + # Extract key run information using _result_to_dict for proper serialization + processed_runs = [] + for run in runs: + processed_run = _result_to_dict(run) + processed_runs.append(processed_run) + + return { + "runs": processed_runs, + "total": total, + "error": None, + } + + except Exception as e: + error_msg = f"Failed to retrieve workflow runs: {str(e)}" + logger.error(error_msg, exc_info=True) + return { + "runs": [], + "total": 0, + "error": error_msg, + } + + +def get_workflow_run_by_id(id: str) -> Dict[str, Any]: + """ + Find workflow runs based on their ID. Note: Only workflow runs will be found. + + Args: + id (str): The ID of the workflow run to find (e.g., 'atlan-snowflake-miner-1714638976-t7s8b'). + + Returns: + Dict[str, Any]: Dictionary containing: + - run: The workflow run object with its details, or None if not found + - error: None if no error occurred, otherwise the error message + + Examples: + # Get a specific workflow run + result = get_workflow_run_by_id("atlan-snowflake-miner-1714638976-t7s8b") + """ + logger.info(f"Retrieving workflow run with ID: {id}") + + try: + if not id: + raise ValueError("id cannot be empty") + + client = get_atlan_client() + + # Retrieve workflow run using the pyatlan SDK + logger.debug(f"Calling client.workflow.find_run_by_id() with id={id}") + run = client.workflow.find_run_by_id(id=id) + + if run is None: + logger.warning(f"Workflow run with ID '{id}' not found") + return { + "run": None, + "error": f"Workflow run with ID '{id}' not found", + } + + logger.info(f"Successfully retrieved workflow run with ID: {id}") + + # Process the run result using _result_to_dict for proper serialization + processed_run = _result_to_dict(run, include_dag=True) + + return { + "run": processed_run, + "error": None, + } + + except Exception as e: + error_msg = f"Failed to retrieve workflow run with ID '{id}': {str(e)}" + logger.error(error_msg, exc_info=True) + return { + "run": None, + "error": error_msg, + } + + +def get_workflow_runs_by_status_and_time_range( + status: Union[List[AtlanWorkflowPhase], List[str]], + started_at: Optional[str] = None, + finished_at: Optional[str] = None, + from_: int = 0, + size: int = 100, +) -> Dict[str, Any]: + """ + Retrieve workflow runs based on their status and time range. + + Args: + status (Union[List[AtlanWorkflowPhase], List[str]]): List of workflow phases to filter by + (e.g., ['Succeeded', 'Failed'] or [AtlanWorkflowPhase.SUCCESS, AtlanWorkflowPhase.FAILED]). + started_at (str, optional): Lower bound on 'status.startedAt' (e.g., 'now-2h', '2024-01-01T00:00:00Z'). + finished_at (str, optional): Lower bound on 'status.finishedAt' (e.g., 'now-1h', '2024-01-01T00:00:00Z'). + from_ (int, optional): Starting index of the search results. Defaults to 0. + size (int, optional): Maximum number of search results to return. Defaults to 100. + + Returns: + Dict[str, Any]: Dictionary containing: + - runs: List of workflow runs with their details + - total: Total count of runs + - error: None if no error occurred, otherwise the error message + + Examples: + # Get succeeded runs from the last 2 hours + from pyatlan.model.enums import AtlanWorkflowPhase + result = get_workflow_runs_by_status_and_time_range( + status=[AtlanWorkflowPhase.SUCCESS], + started_at="now-2h" + ) + + # Get failed runs with both time filters + result = get_workflow_runs_by_status_and_time_range( + status=["Failed"], + started_at="now-24h", + finished_at="now-1h" + ) + + # Get multiple statuses + result = get_workflow_runs_by_status_and_time_range( + status=["Succeeded", "Failed"], + started_at="now-7d" + ) + """ + logger.info( + f"Retrieving workflow runs by status and time range: status={status}, " + f"started_at={started_at}, finished_at={finished_at}, from_={from_}, size={size}" + ) + + try: + if not status: + raise ValueError("status cannot be empty") + + # Convert string statuses to AtlanWorkflowPhase enums if needed + status_list = [] + for s in status: + if isinstance(s, str): + try: + status_list.append(AtlanWorkflowPhase(s)) + except ValueError: + # Try case-insensitive match + for phase in AtlanWorkflowPhase: + if phase.value.upper() == s.upper(): + status_list.append(phase) + break + else: + raise ValueError(f"Invalid workflow phase: {s}") + else: + status_list.append(s) + + client = get_atlan_client() + + # Retrieve workflow runs using the pyatlan SDK + logger.debug( + f"Calling client.workflow.find_runs_by_status_and_time_range() with " + f"status={status_list}, started_at={started_at}, finished_at={finished_at}" + ) + response = client.workflow.find_runs_by_status_and_time_range( + status=status_list, + started_at=started_at, + finished_at=finished_at, + from_=from_, + size=size, + ) + + logger.info("Successfully retrieved workflow runs by status and time range") + + # Process the response + runs = [] + total = 0 + if response and response.hits and response.hits.hits: + runs = response.hits.hits + # Handle total - it can be a dict with 'value' key or an object with .value attribute + total_obj = response.hits.total + if total_obj: + if isinstance(total_obj, dict): + total = total_obj.get("value", len(runs)) + elif hasattr(total_obj, "value"): + total = total_obj.value + else: + total = len(runs) + else: + total = len(runs) + + # Extract key run information using _result_to_dict for proper serialization + processed_runs = [] + for run in runs: + processed_run = _result_to_dict(run) + processed_runs.append(processed_run) + + return { + "runs": processed_runs, + "total": total, + "error": None, + } + + except Exception as e: + error_msg = f"Failed to retrieve workflow runs by status and time range: {str(e)}" + logger.error(error_msg, exc_info=True) + return { + "runs": [], + "total": 0, + "error": error_msg, + } \ No newline at end of file From c6e8bf5185b515c00687765dfc448b90da774039 Mon Sep 17 00:00:00 2001 From: Christopher Tin Date: Fri, 14 Nov 2025 13:29:49 -0800 Subject: [PATCH 02/11] Remove get_workflow_run_by_id and standardize workflow terminology - Remove get_workflow_run_by_id function as it provides no additional value over get_workflow_runs - Standardize terminology: workflow = WorkflowTemplate, workflow_run = Workflow - Update all docstrings to use consistent 'workflow' and 'workflow_run' terminology - Clarify that only functions starting with get_workflow_run return workflow_run information - Update LLM instructions in server.py to match implementation --- modelcontextprotocol/server.py | 268 ++++++++---------------- modelcontextprotocol/tools/__init__.py | 2 - modelcontextprotocol/tools/workflows.py | 132 ++++-------- 3 files changed, 121 insertions(+), 281 deletions(-) diff --git a/modelcontextprotocol/server.py b/modelcontextprotocol/server.py index 7fbaf992..563b70bd 100644 --- a/modelcontextprotocol/server.py +++ b/modelcontextprotocol/server.py @@ -16,7 +16,6 @@ get_workflows_by_type, get_workflow_by_id, get_workflow_runs, - get_workflow_run_by_id, get_workflow_runs_by_status_and_time_range, UpdatableAttribute, CertificateStatus, @@ -932,27 +931,24 @@ def get_workflow_package_names_tool() -> List[str]: @mcp.tool() def get_workflows_by_type_tool(workflow_package_name: str, max_results: int = 10) -> Dict[str, Any]: """ - Retrieve workflows by workflow package name. + Retrieve workflows (WorkflowTemplate) by workflow package name. Naming Convention Note: The Argo/Kubernetes API uses terminology that can be confusing: - - "WorkflowTemplate" (kind="WorkflowTemplate") = Template definition (what users see as "Workflow" in UI) - - "Workflow" (kind="Workflow") = Executed instance/run (what users see as "WorkflowRun" in UI) + - "WorkflowTemplate" (kind="WorkflowTemplate") = workflow (template definition) + - "Workflow" (kind="Workflow") = workflow_run (executed instance/run) - This tool primarily retrieves executed workflow instances (Workflow kind) that have been run. - It may occasionally return WorkflowTemplate definitions if they are included in the search results. - - IMPORTANT: This tool returns only metadata and execution status. It does NOT return full workflow + IMPORTANT: This tool returns only basic metadata. It does NOT return full workflow instructions, steps, or transformations. To get complete workflow details including all steps and transformations, use get_workflow_by_id_tool instead. + Note: This tool only returns workflows (WorkflowTemplate), not workflow_runs. + To get workflow_run information, use functions that start with get_workflow_run. + When to use this tool: - Use when you need to list multiple workflows by package type - - Use when you only need metadata (status, timing, resource usage) without full workflow details + - Use when you only need metadata (package info, certification status) without full workflow details - Do NOT use if you need the complete workflow specification with all steps and transformations - - The returned workflows can be of two types: - 1. Workflow (kind="Workflow"): An executed workflow instance/run with execution status, timing, and resource usage. - 2. WorkflowTemplate (kind="WorkflowTemplate"): A template definition (if templates are included in results). + - Do NOT use if you need workflow_run (execution) information Args: workflow_package_name (str): The workflow package name (e.g., 'atlan-snowflake', 'atlan-bigquery', 'SNOWFLAKE'). @@ -961,38 +957,16 @@ def get_workflows_by_type_tool(workflow_package_name: str, max_results: int = 10 Returns: Dict[str, Any]: Dictionary containing: - - workflows: List of workflow dictionaries. Each workflow contains: - For Workflow type: - Run Metadata: - - run_id: Unique identifier for this workflow run - - run_phase: Execution phase (Succeeded, Running, Failed, etc.) - - run_started_at: When the workflow run started - - run_finished_at: When the workflow run finished - - run_estimated_duration: Estimated execution duration - - run_progress: Progress indicator - - run_cpu_usage: CPU resource usage - - run_memory_usage: Memory resource usage - - Workflow Metadata: - - workflow_id: Reference to the workflow template used - - workflow_package_name: Package identifier - - workflow_cron_schedule: Cron schedule if scheduled - - workflow_cron_timezone: Timezone for cron schedule - - workflow_creator_email, workflow_creator_username: Creator information - - workflow_modifier_email, workflow_modifier_username: Last modifier information - - workflow_creation_timestamp: When the workflow was created - - workflow_archiving_status: Archiving status - For WorkflowTemplate type: - - template_name: Name of the template - - package_name, package_version: Package information - - source_system, source_category, workflow_type: Source information - - certified, verified: Certification status - - creator_email, creator_username: Creator information - - modifier_email, modifier_username: Last modifier information - - creation_timestamp: When template was created - - package_author, package_description, package_repository: Package metadata + - workflows: List of workflow (WorkflowTemplate) dictionaries. Each workflow contains: + - template_name: Name of the template + - package_name, package_version: Package information + - source_system, source_category, workflow_type: Source information + - certified, verified: Certification status + - creator_email, creator_username: Creator information + - modifier_email, modifier_username: Last modifier information + - creation_timestamp: When template was created + - package_author, package_description, package_repository: Package metadata - total: Total count of workflows found - - error: None if no error occurred, otherwise the error message Examples: # Get Snowflake workflows @@ -1010,11 +984,11 @@ def get_workflows_by_type_tool(workflow_package_name: str, max_results: int = 10 @mcp.tool() def get_workflow_by_id_tool(id: str) -> Dict[str, Any]: """ - Retrieve a specific workflow by its ID. + Retrieve a specific workflow (WorkflowTemplate) by its ID. Naming Convention Note: The Argo/Kubernetes API uses terminology that can be confusing: - - "WorkflowTemplate" (kind="WorkflowTemplate") = Template definition (what users see as "Workflow" in UI) - - "Workflow" (kind="Workflow") = Executed instance/run (what users see as "WorkflowRun" in UI) + - "WorkflowTemplate" (kind="WorkflowTemplate") = workflow (template definition) + - "Workflow" (kind="Workflow") = workflow_run (executed instance/run) IMPORTANT: This is the ONLY tool that returns full workflow instructions with all steps and transformations. All other workflow tools (get_workflows_by_type_tool, get_workflow_runs_tool, etc.) return only metadata @@ -1023,55 +997,42 @@ def get_workflow_by_id_tool(id: str) -> Dict[str, Any]: This tool uses include_dag=True internally, which means it returns the complete workflow specification including the workflow DAG (directed acyclic graph), all steps, transformations, and execution details. - Note: Only workflows that have been run will be found (Workflow instances with kind="Workflow", not WorkflowTemplates). + Note: This tool only returns workflows (WorkflowTemplate), not workflow_runs. + To get workflow_run information, use functions that start with get_workflow_run. When to use this tool: - Use when you need the COMPLETE workflow definition with all steps and transformations - Use when you need to understand the full workflow execution flow - Use when you need workflow_spec and workflow_steps data - Do NOT use if you only need basic metadata (use other workflow tools instead) + - Do NOT use if you need workflow_run (execution) information Args: id (str): The unique identifier (ID) of the workflow (e.g., 'atlan-snowflake-miner-1714638976'). Returns: Dict[str, Any]: Dictionary containing: - - workflow: The workflow dictionary with comprehensive metadata, or None if not found. + - workflow: The workflow (WorkflowTemplate) dictionary with comprehensive metadata, or None if not found. The workflow dictionary contains: - Run Metadata: - - run_id: Unique identifier for this workflow run - - run_phase: Execution phase (Succeeded, Running, Failed, etc.) - - run_started_at: When the workflow run started (ISO timestamp) - - run_finished_at: When the workflow run finished (ISO timestamp) - - run_estimated_duration: Estimated execution duration - - run_progress: Progress indicator (e.g., "1/3") - - run_cpu_usage: CPU resource usage duration - - run_memory_usage: Memory resource usage duration - - Workflow Metadata: - - workflow_id: Reference to the workflow template used - - workflow_package_name: Package identifier - - workflow_cron_schedule: Cron schedule if scheduled - - workflow_cron_timezone: Timezone for cron schedule - - workflow_creator_id, workflow_creator_email, workflow_creator_username: Creator information - - workflow_modifier_id, workflow_modifier_email, workflow_modifier_username: Last modifier information - - workflow_creation_timestamp: When the workflow was created - - workflow_archiving_status: Archiving status - - - Full workflow specification including all steps, transformations, and execution details + - template_name: Name of the template + - package_name, package_version: Package information + - source_system, source_category, workflow_type: Source information + - certified, verified: Certification status + - creator_email, creator_username: Creator information + - modifier_email, modifier_username: Last modifier information + - creation_timestamp: When template was created + - package_author, package_description, package_repository: Package metadata + - workflow_steps: Workflow steps (when include_dag=True) + - workflow_spec: Full workflow specification (when include_dag=True) - error: None if no error occurred, otherwise the error message Examples: # Get a specific workflow by ID to see full workflow instructions result = get_workflow_by_id_tool("atlan-snowflake-miner-1714638976") - # Check if workflow succeeded - if result.get("workflow", {}).get("run_phase") == "Succeeded": - print("Workflow completed successfully") - # Access full workflow steps and transformations workflow = result.get("workflow") - # The workflow object contains complete workflow definition + # The workflow object contains complete workflow definition with workflow_steps and workflow_spec """ return get_workflow_by_id(id=id) @@ -1084,48 +1045,48 @@ def get_workflow_runs_tool( size: int = 100, ) -> Dict[str, Any]: """ - Retrieve all workflow runs for a specific workflow and phase. + Retrieve all workflow_runs for a specific workflow and phase. Naming Convention Note: The Argo/Kubernetes API uses terminology that can be confusing: - - "WorkflowTemplate" (kind="WorkflowTemplate") = Template definition (what users see as "Workflow" in UI) - - "Workflow" (kind="Workflow") = Executed instance/run (what users see as "WorkflowRun" in UI) + - "WorkflowTemplate" (kind="WorkflowTemplate") = workflow (template definition) + - "Workflow" (kind="Workflow") = workflow_run (executed instance/run) - This tool returns workflow run instances (kind="Workflow") that match the specified workflow template name and phase. - Each run contains execution details, timing, resource usage, and status information. + This tool returns workflow_run instances (kind="Workflow") that match the specified workflow name and phase. + Each workflow_run contains execution details, timing, resource usage, and status information. IMPORTANT: This tool returns only metadata and execution status. It does NOT return full workflow instructions, steps, or transformations. To get complete workflow details including all steps and transformations, use get_workflow_by_id_tool instead. When to use this tool: - - Use when you need to find all runs of a specific workflow template by execution phase - - Use when you need execution metadata (status, timing, resource usage) for multiple runs - - Use when you need to filter runs by phase (Succeeded, Failed, Running, etc.) + - Use when you need to find all workflow_runs of a specific workflow by execution phase + - Use when you need execution metadata (status, timing, resource usage) for multiple workflow_runs + - Use when you need to filter workflow_runs by phase (Succeeded, Failed, Running, etc.) - Do NOT use if you need the complete workflow specification (use get_workflow_by_id_tool instead) Args: - workflow_name (str): Name of the workflow template as displayed in the UI (e.g., 'atlan-snowflake-miner-1714638976'). - This refers to the WorkflowTemplate name, not individual run IDs. - workflow_phase (str): Phase of the workflow. Common values: 'Succeeded', 'Running', 'Failed', 'Error', 'Pending'. + workflow_name (str): Name of the workflow (template) as displayed in the UI (e.g., 'atlan-snowflake-miner-1714638976'). + This refers to the workflow name, not individual workflow_run IDs. + workflow_phase (str): Phase of the workflow_run. Common values: 'Succeeded', 'Running', 'Failed', 'Error', 'Pending'. Case-insensitive matching is supported. from_ (int, optional): Starting index of the search results for pagination. Defaults to 0. size (int, optional): Maximum number of search results to return. Defaults to 100. Returns: Dict[str, Any]: Dictionary containing: - - runs: List of workflow run dictionaries. Each run contains: + - runs: List of workflow_run dictionaries. Each workflow_run contains: Run Metadata: - - run_id: Unique identifier for this workflow run + - run_id: Unique identifier for this workflow_run - run_phase: Execution phase (Succeeded, Running, Failed, etc.) - - run_started_at: When the workflow run started (ISO timestamp) - - run_finished_at: When the workflow run finished (ISO timestamp, None if still running) + - run_started_at: When the workflow_run started (ISO timestamp) + - run_finished_at: When the workflow_run finished (ISO timestamp, None if still running) - run_estimated_duration: Estimated execution duration - run_progress: Progress indicator (e.g., "1/3") - run_cpu_usage: CPU resource usage duration - run_memory_usage: Memory resource usage duration Workflow Metadata: - - workflow_id: Reference to the workflow template used + - workflow_id: Reference to the workflow (template) used - workflow_package_name: Package identifier - workflow_cron_schedule: Cron schedule if scheduled - workflow_cron_timezone: Timezone for cron schedule @@ -1133,24 +1094,24 @@ def get_workflow_runs_tool( - workflow_modifier_id, workflow_modifier_email, workflow_modifier_username: Last modifier information - workflow_creation_timestamp: When the workflow was created - workflow_archiving_status: Archiving status - - total: Total count of runs matching the criteria + - total: Total count of workflow_runs matching the criteria - error: None if no error occurred, otherwise the error message Examples: - # Get succeeded workflow runs + # Get succeeded workflow_runs result = get_workflow_runs_tool("atlan-snowflake-miner-1714638976", "Succeeded") - # Get running workflows + # Get running workflow_runs result = get_workflow_runs_tool("atlan-snowflake-miner-1714638976", "Running") - # Get failed workflows with pagination + # Get failed workflow_runs with pagination result = get_workflow_runs_tool("atlan-snowflake-miner-1714638976", "Failed", from_=0, size=50) - # Analyze run durations + # Analyze workflow_run durations runs = result.get("runs", []) for run in runs: if run.get("run_finished_at") and run.get("run_started_at"): - print(f"Run {run['run_id']} took {run.get('run_estimated_duration')}") + print(f"Workflow run {run['run_id']} took {run.get('run_estimated_duration')}") """ return get_workflow_runs( workflow_name=workflow_name, @@ -1160,71 +1121,6 @@ def get_workflow_runs_tool( ) -@mcp.tool() -def get_workflow_run_by_id_tool(id: str) -> Dict[str, Any]: - """ - Find a workflow run by its ID. - - Naming Convention Note: The Argo/Kubernetes API uses terminology that can be confusing: - - "WorkflowTemplate" (kind="WorkflowTemplate") = Template definition (what users see as "Workflow" in UI) - - "Workflow" (kind="Workflow") = Executed instance/run (what users see as "WorkflowRun" in UI) - - This tool retrieves a specific workflow run instance (kind="Workflow") by its unique run ID. - Note: Only workflow runs (executed instances) will be found, not templates. - - IMPORTANT: This tool returns only metadata and execution status. It does NOT return full workflow - instructions, steps, or transformations. To get complete workflow details including all steps and - transformations, use get_workflow_by_id_tool instead. - - When to use this tool: - - Use when you have a specific workflow run ID and need its execution details - - Use when you need metadata for a single run (status, timing, resource usage) - - Do NOT use if you need the complete workflow specification (use get_workflow_by_id_tool instead) - - Args: - id (str): The ID of the workflow run to find (e.g., 'atlan-snowflake-miner-1714638976-t7s8b'). - Workflow run IDs typically include the workflow name followed by a unique suffix. - - Returns: - Dict[str, Any]: Dictionary containing: - - run: The workflow run dictionary with comprehensive execution details, or None if not found. - The run dictionary contains: - Run Metadata: - - run_id: Unique identifier for this workflow run - - run_phase: Execution phase (Succeeded, Running, Failed, Error, etc.) - - run_started_at: When the workflow run started (ISO timestamp) - - run_finished_at: When the workflow run finished (ISO timestamp, None if still running) - - run_estimated_duration: Estimated execution duration - - run_progress: Progress indicator showing completed steps (e.g., "1/3") - - run_cpu_usage: CPU resource usage duration - - run_memory_usage: Memory resource usage duration - - Workflow Metadata: - - workflow_id: Reference to the workflow template that was used - - workflow_package_name: Package identifier - - workflow_cron_schedule: Cron schedule if scheduled - - workflow_cron_timezone: Timezone for cron schedule - - workflow_creator_id, workflow_creator_email, workflow_creator_username: Creator information - - workflow_modifier_id, workflow_modifier_email, workflow_modifier_username: Last modifier information - - workflow_creation_timestamp: When the workflow was created - - workflow_archiving_status: Whether the workflow is archived - - error: None if no error occurred, otherwise the error message - - Examples: - # Get a specific workflow run - result = get_workflow_run_by_id_tool("atlan-snowflake-miner-1714638976-t7s8b") - - # Check run status - run = result.get("run") - if run: - print(f"Status: {run.get('run_phase')}") - print(f"Duration: {run.get('run_estimated_duration')}") - if run.get('run_phase') == 'Failed': - print(f"Run ID: {run.get('run_id')}") - """ - return get_workflow_run_by_id(id=id) - - @mcp.tool() def get_workflow_runs_by_status_and_time_range_tool( status: List[str], @@ -1234,13 +1130,13 @@ def get_workflow_runs_by_status_and_time_range_tool( size: int = 100, ) -> Dict[str, Any]: """ - Retrieve workflow runs based on their status and time range. + Retrieve workflow_runs based on their status and time range. Naming Convention Note: The Argo/Kubernetes API uses terminology that can be confusing: - - "WorkflowTemplate" (kind="WorkflowTemplate") = Template definition (what users see as "Workflow" in UI) - - "Workflow" (kind="Workflow") = Executed instance/run (what users see as "WorkflowRun" in UI) + - "WorkflowTemplate" (kind="WorkflowTemplate") = workflow (template definition) + - "Workflow" (kind="Workflow") = workflow_run (executed instance/run) - This tool allows you to search for workflow run instances (kind="Workflow") by filtering on their execution status + This tool allows you to search for workflow_run instances (kind="Workflow") by filtering on their execution status (e.g., Succeeded, Failed, Running) and optionally by time ranges. This is useful for monitoring, debugging, and analyzing workflow execution patterns. @@ -1249,45 +1145,45 @@ def get_workflow_runs_by_status_and_time_range_tool( transformations, use get_workflow_by_id_tool instead. When to use this tool: - - Use when you need to find runs across multiple workflows filtered by status and time + - Use when you need to find workflow_runs across multiple workflows filtered by status and time - Use when monitoring workflow execution patterns or analyzing failures - - Use when you need execution metadata for runs within a specific time window + - Use when you need execution metadata for workflow_runs within a specific time window - Do NOT use if you need the complete workflow specification (use get_workflow_by_id_tool instead) Args: - status (List[str]): List of workflow phases to filter by. Common values: - - 'Succeeded': Successfully completed workflows - - 'Failed': Workflows that failed - - 'Running': Currently executing workflows - - 'Error': Workflows that encountered errors - - 'Pending': Workflows waiting to start + status (List[str]): List of workflow_run phases to filter by. Common values: + - 'Succeeded': Successfully completed workflow_runs + - 'Failed': Workflow_runs that failed + - 'Running': Currently executing workflow_runs + - 'Error': Workflow_runs that encountered errors + - 'Pending': Workflow_runs waiting to start Case-insensitive matching is supported. Example: ['Succeeded', 'Failed'] started_at (str, optional): Lower bound on 'status.startedAt' timestamp. Accepts: - Relative time format: 'now-2h', 'now-24h', 'now-7d', 'now-30d' - ISO 8601 format: '2024-01-01T00:00:00Z' - Filters workflows that started at or after this time. + Filters workflow_runs that started at or after this time. finished_at (str, optional): Lower bound on 'status.finishedAt' timestamp. Accepts: - Relative time format: 'now-1h', 'now-12h' - ISO 8601 format: '2024-01-01T00:00:00Z' - Filters workflows that finished at or after this time. + Filters workflow_runs that finished at or after this time. from_ (int, optional): Starting index of the search results for pagination. Defaults to 0. size (int, optional): Maximum number of search results to return. Defaults to 100. Returns: Dict[str, Any]: Dictionary containing: - - runs: List of workflow run dictionaries matching the criteria. Each run contains: + - runs: List of workflow_run dictionaries matching the criteria. Each workflow_run contains: Run Metadata: - - run_id: Unique identifier for this workflow run + - run_id: Unique identifier for this workflow_run - run_phase: Execution phase (Succeeded, Running, Failed, etc.) - - run_started_at: When the workflow run started (ISO timestamp) - - run_finished_at: When the workflow run finished (ISO timestamp, None if still running) + - run_started_at: When the workflow_run started (ISO timestamp) + - run_finished_at: When the workflow_run finished (ISO timestamp, None if still running) - run_estimated_duration: Estimated execution duration - run_progress: Progress indicator (e.g., "1/3") - run_cpu_usage: CPU resource usage duration - run_memory_usage: Memory resource usage duration Workflow Metadata: - - workflow_id: Reference to the workflow template used + - workflow_id: Reference to the workflow (template) used - workflow_package_name: Package identifier - workflow_cron_schedule: Cron schedule if scheduled - workflow_cron_timezone: Timezone for cron schedule @@ -1295,17 +1191,17 @@ def get_workflow_runs_by_status_and_time_range_tool( - workflow_modifier_id, workflow_modifier_email, workflow_modifier_username: Last modifier information - workflow_creation_timestamp: When the workflow was created - workflow_archiving_status: Archiving status - - total: Total count of runs matching the criteria (may be larger than the returned list if paginated) + - total: Total count of workflow_runs matching the criteria (may be larger than the returned list if paginated) - error: None if no error occurred, otherwise the error message Examples: - # Get succeeded runs from the last 2 hours + # Get succeeded workflow_runs from the last 2 hours result = get_workflow_runs_by_status_and_time_range_tool( status=["Succeeded"], started_at="now-2h" ) - # Get failed runs from the last 24 hours + # Get failed workflow_runs from the last 24 hours result = get_workflow_runs_by_status_and_time_range_tool( status=["Failed"], started_at="now-24h" @@ -1318,14 +1214,14 @@ def get_workflow_runs_by_status_and_time_range_tool( finished_at="now-1h" ) - # Get running workflows + # Get running workflow_runs result = get_workflow_runs_by_status_and_time_range_tool( status=["Running"] ) # Analyze failure rates failed_runs = [r for r in result.get("runs", []) if r.get("run_phase") == "Failed"] - print(f"Found {len(failed_runs)} failed runs in the time range") + print(f"Found {len(failed_runs)} failed workflow_runs in the time range") """ return get_workflow_runs_by_status_and_time_range( status=status, diff --git a/modelcontextprotocol/tools/__init__.py b/modelcontextprotocol/tools/__init__.py index bc7c3533..0363d3d3 100644 --- a/modelcontextprotocol/tools/__init__.py +++ b/modelcontextprotocol/tools/__init__.py @@ -22,7 +22,6 @@ get_workflows_by_type, get_workflow_by_id, get_workflow_runs, - get_workflow_run_by_id, get_workflow_runs_by_status_and_time_range, ) @@ -39,7 +38,6 @@ "get_workflows_by_type", "get_workflow_by_id", "get_workflow_runs", - "get_workflow_run_by_id", "get_workflow_runs_by_status_and_time_range", "CertificateStatus", "UpdatableAttribute", diff --git a/modelcontextprotocol/tools/workflows.py b/modelcontextprotocol/tools/workflows.py index 4aa9de2c..a3a875a8 100644 --- a/modelcontextprotocol/tools/workflows.py +++ b/modelcontextprotocol/tools/workflows.py @@ -2,12 +2,12 @@ Naming Convention Note: The Argo/Kubernetes API uses terminology that can be confusing: -- "WorkflowTemplate" (kind="WorkflowTemplate") = Template definition (what users see as "Workflow" in UI) -- "Workflow" (kind="Workflow") = Executed instance/run (what users see as "WorkflowRun" in UI) +- "WorkflowTemplate" (kind="WorkflowTemplate") = workflow (template definition) +- "Workflow" (kind="Workflow") = workflow_run (executed instance/run) Throughout this module: -- Functions dealing with "WorkflowTemplate" handle template definitions -- Functions dealing with "Workflow" handle executed instances/runs +- "workflow" refers to WorkflowTemplate (kind="WorkflowTemplate") - the template definition +- "workflow_run" refers to Workflow (kind="Workflow") - an executed instance/run - The extract_workflow_metadata() function returns fields prefixed with "run_*" for execution data and "workflow_*" for workflow-level metadata to clearly distinguish between the two. """ @@ -28,20 +28,20 @@ def _result_to_dict(result: Any, include_dag: bool = False) -> Optional[Dict[str, Any]]: """ - Process a workflow object into a standardized dictionary format. + Process a workflow or workflow_run object into a standardized dictionary format. Note: There is a naming confusion in the Argo/Kubernetes API: - - "WorkflowTemplate" (kind="WorkflowTemplate") = Template definition (what we call "Workflow" in UI) - - "Workflow" (kind="Workflow") = Executed instance/run (what we call "WorkflowRun" in UI) + - "WorkflowTemplate" (kind="WorkflowTemplate") = workflow (template definition) + - "Workflow" (kind="Workflow") = workflow_run (executed instance/run) This function handles both types and returns standardized metadata dictionaries. Args: - result: The workflow object from PyAtlan (WorkflowSearchResult or Workflow). - include_dag: If True, includes the workflow DAG in the output. This only applies if the input is a WorkflowTemplate. - For Workflow instances (runs), this parameter has no effect. + result: The workflow or workflow_run object from PyAtlan (WorkflowSearchResult or Workflow). + include_dag: If True, includes the workflow DAG in the output. This only applies if the input is a workflow (WorkflowTemplate). + For workflow_run instances, this parameter has no effect. Returns: - Optional[Dict[str, Any]]: Serialized workflow dictionary using Pydantic's dict() method. + Optional[Dict[str, Any]]: Serialized workflow or workflow_run dictionary using Pydantic's dict() method. Returns None for unsupported workflow types. Returns empty dict {} if result is None or conversion fails. """ if result is None: @@ -68,18 +68,18 @@ def _result_to_dict(result: Any, include_dag: bool = False) -> Optional[Dict[str def extract_workflow_template_metadata(dict_repr: Dict[str, Any], include_dag: bool = False) -> Dict[str, Any]: """ - Extract useful metadata from an Argo WorkflowTemplate. + Extract useful metadata from a workflow (WorkflowTemplate). - Note: In Argo terminology, "WorkflowTemplate" is the template definition (what users see as "Workflow" in the UI). - This function extracts template metadata including package info, source system, certification status, etc. + Note: In Argo terminology, "WorkflowTemplate" (kind="WorkflowTemplate") is a workflow (template definition). + This function extracts workflow metadata including package info, source system, certification status, etc. Args: - dict_repr: The workflow template object from the workflows array (kind="WorkflowTemplate") + dict_repr: The workflow object from the workflows array (kind="WorkflowTemplate") include_dag: If True, includes the workflow DAG (workflow_steps and workflow_spec) in the output. When False, returns only essential metadata fields. Returns: - Dictionary containing extracted template metadata with fields like: + Dictionary containing extracted workflow metadata with fields like: - template_name, package_name, package_version - source_system, source_category, workflow_type - certified, verified flags @@ -121,20 +121,20 @@ def extract_workflow_template_metadata(dict_repr: Dict[str, Any], include_dag: b def extract_workflow_metadata(dict_repr: Dict[str, Any]) -> Dict[str, Any]: """ - Extract comprehensive metadata from a Workflow (executed instance/run). + Extract comprehensive metadata from a workflow_run (Workflow executed instance/run). - Note: In Argo terminology, "Workflow" (kind="Workflow") is an executed instance/run - (what users see as "WorkflowRun" in the UI). This is different from "WorkflowTemplate". + Note: In Argo terminology, "Workflow" (kind="Workflow") is a workflow_run (executed instance/run). + This is different from "WorkflowTemplate" which is a workflow (template definition). The returned dictionary uses a two-tier naming convention: - Fields prefixed with "run_*" contain execution-specific metadata (phase, timing, resource usage) - Fields prefixed with "workflow_*" contain workflow-level metadata (template reference, package, creator) Args: - dict_repr: The workflow object dictionary representation (kind="Workflow") + dict_repr: The workflow_run object dictionary representation (kind="Workflow") Returns: - Dictionary containing comprehensive workflow run metadata with fields: + Dictionary containing comprehensive workflow_run metadata with fields: Run Metadata (execution-specific): - run_id: Unique identifier for this run - run_phase: Execution phase (Succeeded, Running, Failed, etc.) @@ -143,7 +143,7 @@ def extract_workflow_metadata(dict_repr: Dict[str, Any]) -> Dict[str, Any]: - run_cpu_usage, run_memory_usage: Resource usage Workflow Metadata (workflow-level): - - workflow_id: Reference to the workflow template used + - workflow_id: Reference to the workflow (template) used - workflow_package_name: Package identifier - workflow_cron_schedule, workflow_cron_timezone: Scheduling info - workflow_creator_*, workflow_modifier_*: Ownership information @@ -191,7 +191,7 @@ def get_workflow_package_names() -> List[str]: def get_workflows_by_type(workflow_package_name: Union[WorkflowPackage, str], max_results: int = 10) -> Dict[str, Any]: """ - Retrieve workflows by type (workflow package name). Note: Only workflows that have been run will be found. + Retrieve workflows (WorkflowTemplate) by type (workflow package name). Args: workflow_package_name (Union[WorkflowPackage, str]): Workflow package type (e.g., WorkflowPackage.SNOWFLAKE or "atlan-snowflake"). @@ -199,7 +199,7 @@ def get_workflows_by_type(workflow_package_name: Union[WorkflowPackage, str], ma Returns: Dict[str, Any]: Dictionary containing: - - workflows: List of workflows with their configurations + - workflows: List of workflows (WorkflowTemplate) with their configurations - total: Total count of workflows - error: None if no error occurred, otherwise the error message @@ -247,14 +247,14 @@ def get_workflows_by_type(workflow_package_name: Union[WorkflowPackage, str], ma def get_workflow_by_id(id: str) -> Dict[str, Any]: """ - Retrieve a specific workflow by its ID. Note: Only workflows that have been run will be found. + Retrieve a specific workflow (WorkflowTemplate) by its ID. Args: id (str): The unique identifier (ID) of the workflow (e.g., 'atlan-snowflake-miner-1714638976'). Returns: Dict[str, Any]: Dictionary containing: - - workflow: The workflow object with its configuration, or None if not found + - workflow: The workflow (WorkflowTemplate) object with its configuration, or None if not found - error: None if no error occurred, otherwise the error message Examples: @@ -304,26 +304,26 @@ def get_workflow_runs( size: int = 100, ) -> Dict[str, Any]: """ - Retrieve all workflow runs for a specific workflow and phase. + Retrieve all workflow_runs for a specific workflow and phase. Args: - workflow_name (str): Name of the workflow as displayed in the UI (e.g., 'atlan-snowflake-miner-1714638976'). - workflow_phase (Union[AtlanWorkflowPhase, str]): Phase of the workflow (e.g., Succeeded, Running, Failed). + workflow_name (str): Name of the workflow (template) as displayed in the UI (e.g., 'atlan-snowflake-miner-1714638976'). + workflow_phase (Union[AtlanWorkflowPhase, str]): Phase of the workflow_run (e.g., Succeeded, Running, Failed). from_ (int, optional): Starting index of the search results. Defaults to 0. size (int, optional): Maximum number of search results to return. Defaults to 100. Returns: Dict[str, Any]: Dictionary containing: - - runs: List of workflow runs with their details - - total: Total count of runs + - runs: List of workflow_runs with their details + - total: Total count of workflow_runs - error: None if no error occurred, otherwise the error message Examples: - # Get succeeded workflow runs + # Get succeeded workflow_runs from pyatlan.model.enums import AtlanWorkflowPhase result = get_workflow_runs("atlan-snowflake-miner-1714638976", AtlanWorkflowPhase.SUCCESS) - # Get running workflows + # Get running workflow_runs result = get_workflow_runs("atlan-snowflake-miner-1714638976", "Running") """ logger.info(f"Retrieving workflow runs: workflow_name={workflow_name}, phase={workflow_phase}, from_={from_}, size={size}") @@ -399,60 +399,6 @@ def get_workflow_runs( } -def get_workflow_run_by_id(id: str) -> Dict[str, Any]: - """ - Find workflow runs based on their ID. Note: Only workflow runs will be found. - - Args: - id (str): The ID of the workflow run to find (e.g., 'atlan-snowflake-miner-1714638976-t7s8b'). - - Returns: - Dict[str, Any]: Dictionary containing: - - run: The workflow run object with its details, or None if not found - - error: None if no error occurred, otherwise the error message - - Examples: - # Get a specific workflow run - result = get_workflow_run_by_id("atlan-snowflake-miner-1714638976-t7s8b") - """ - logger.info(f"Retrieving workflow run with ID: {id}") - - try: - if not id: - raise ValueError("id cannot be empty") - - client = get_atlan_client() - - # Retrieve workflow run using the pyatlan SDK - logger.debug(f"Calling client.workflow.find_run_by_id() with id={id}") - run = client.workflow.find_run_by_id(id=id) - - if run is None: - logger.warning(f"Workflow run with ID '{id}' not found") - return { - "run": None, - "error": f"Workflow run with ID '{id}' not found", - } - - logger.info(f"Successfully retrieved workflow run with ID: {id}") - - # Process the run result using _result_to_dict for proper serialization - processed_run = _result_to_dict(run, include_dag=True) - - return { - "run": processed_run, - "error": None, - } - - except Exception as e: - error_msg = f"Failed to retrieve workflow run with ID '{id}': {str(e)}" - logger.error(error_msg, exc_info=True) - return { - "run": None, - "error": error_msg, - } - - def get_workflow_runs_by_status_and_time_range( status: Union[List[AtlanWorkflowPhase], List[str]], started_at: Optional[str] = None, @@ -461,10 +407,10 @@ def get_workflow_runs_by_status_and_time_range( size: int = 100, ) -> Dict[str, Any]: """ - Retrieve workflow runs based on their status and time range. + Retrieve workflow_runs based on their status and time range. Args: - status (Union[List[AtlanWorkflowPhase], List[str]]): List of workflow phases to filter by + status (Union[List[AtlanWorkflowPhase], List[str]]): List of workflow_run phases to filter by (e.g., ['Succeeded', 'Failed'] or [AtlanWorkflowPhase.SUCCESS, AtlanWorkflowPhase.FAILED]). started_at (str, optional): Lower bound on 'status.startedAt' (e.g., 'now-2h', '2024-01-01T00:00:00Z'). finished_at (str, optional): Lower bound on 'status.finishedAt' (e.g., 'now-1h', '2024-01-01T00:00:00Z'). @@ -473,19 +419,19 @@ def get_workflow_runs_by_status_and_time_range( Returns: Dict[str, Any]: Dictionary containing: - - runs: List of workflow runs with their details - - total: Total count of runs + - runs: List of workflow_runs with their details + - total: Total count of workflow_runs - error: None if no error occurred, otherwise the error message Examples: - # Get succeeded runs from the last 2 hours + # Get succeeded workflow_runs from the last 2 hours from pyatlan.model.enums import AtlanWorkflowPhase result = get_workflow_runs_by_status_and_time_range( status=[AtlanWorkflowPhase.SUCCESS], started_at="now-2h" ) - # Get failed runs with both time filters + # Get failed workflow_runs with both time filters result = get_workflow_runs_by_status_and_time_range( status=["Failed"], started_at="now-24h", From ab4cdb4250c32cdc7aed2d7affea2ae58f8cdcc9 Mon Sep 17 00:00:00 2001 From: Christopher Tin Date: Wed, 26 Nov 2025 07:48:53 -0800 Subject: [PATCH 03/11] consolidated workflow runs functions --- modelcontextprotocol/server.py | 160 +++++---------- modelcontextprotocol/tools/__init__.py | 2 - modelcontextprotocol/tools/workflows.py | 261 ++++++++++-------------- 3 files changed, 157 insertions(+), 266 deletions(-) diff --git a/modelcontextprotocol/server.py b/modelcontextprotocol/server.py index 563b70bd..ad71051c 100644 --- a/modelcontextprotocol/server.py +++ b/modelcontextprotocol/server.py @@ -16,7 +16,6 @@ get_workflows_by_type, get_workflow_by_id, get_workflow_runs, - get_workflow_runs_by_status_and_time_range, UpdatableAttribute, CertificateStatus, UpdatableAsset, @@ -1039,139 +1038,69 @@ def get_workflow_by_id_tool(id: str) -> Dict[str, Any]: @mcp.tool() def get_workflow_runs_tool( - workflow_name: str, - workflow_phase: str, - from_: int = 0, - size: int = 100, -) -> Dict[str, Any]: - """ - Retrieve all workflow_runs for a specific workflow and phase. - - Naming Convention Note: The Argo/Kubernetes API uses terminology that can be confusing: - - "WorkflowTemplate" (kind="WorkflowTemplate") = workflow (template definition) - - "Workflow" (kind="Workflow") = workflow_run (executed instance/run) - - This tool returns workflow_run instances (kind="Workflow") that match the specified workflow name and phase. - Each workflow_run contains execution details, timing, resource usage, and status information. - - IMPORTANT: This tool returns only metadata and execution status. It does NOT return full workflow - instructions, steps, or transformations. To get complete workflow details including all steps and - transformations, use get_workflow_by_id_tool instead. - - When to use this tool: - - Use when you need to find all workflow_runs of a specific workflow by execution phase - - Use when you need execution metadata (status, timing, resource usage) for multiple workflow_runs - - Use when you need to filter workflow_runs by phase (Succeeded, Failed, Running, etc.) - - Do NOT use if you need the complete workflow specification (use get_workflow_by_id_tool instead) - - Args: - workflow_name (str): Name of the workflow (template) as displayed in the UI (e.g., 'atlan-snowflake-miner-1714638976'). - This refers to the workflow name, not individual workflow_run IDs. - workflow_phase (str): Phase of the workflow_run. Common values: 'Succeeded', 'Running', 'Failed', 'Error', 'Pending'. - Case-insensitive matching is supported. - from_ (int, optional): Starting index of the search results for pagination. Defaults to 0. - size (int, optional): Maximum number of search results to return. Defaults to 100. - - Returns: - Dict[str, Any]: Dictionary containing: - - runs: List of workflow_run dictionaries. Each workflow_run contains: - Run Metadata: - - run_id: Unique identifier for this workflow_run - - run_phase: Execution phase (Succeeded, Running, Failed, etc.) - - run_started_at: When the workflow_run started (ISO timestamp) - - run_finished_at: When the workflow_run finished (ISO timestamp, None if still running) - - run_estimated_duration: Estimated execution duration - - run_progress: Progress indicator (e.g., "1/3") - - run_cpu_usage: CPU resource usage duration - - run_memory_usage: Memory resource usage duration - - Workflow Metadata: - - workflow_id: Reference to the workflow (template) used - - workflow_package_name: Package identifier - - workflow_cron_schedule: Cron schedule if scheduled - - workflow_cron_timezone: Timezone for cron schedule - - workflow_creator_id, workflow_creator_email, workflow_creator_username: Creator information - - workflow_modifier_id, workflow_modifier_email, workflow_modifier_username: Last modifier information - - workflow_creation_timestamp: When the workflow was created - - workflow_archiving_status: Archiving status - - total: Total count of workflow_runs matching the criteria - - error: None if no error occurred, otherwise the error message - - Examples: - # Get succeeded workflow_runs - result = get_workflow_runs_tool("atlan-snowflake-miner-1714638976", "Succeeded") - - # Get running workflow_runs - result = get_workflow_runs_tool("atlan-snowflake-miner-1714638976", "Running") - - # Get failed workflow_runs with pagination - result = get_workflow_runs_tool("atlan-snowflake-miner-1714638976", "Failed", from_=0, size=50) - - # Analyze workflow_run durations - runs = result.get("runs", []) - for run in runs: - if run.get("run_finished_at") and run.get("run_started_at"): - print(f"Workflow run {run['run_id']} took {run.get('run_estimated_duration')}") - """ - return get_workflow_runs( - workflow_name=workflow_name, - workflow_phase=workflow_phase, - from_=from_, - size=size, - ) - - -@mcp.tool() -def get_workflow_runs_by_status_and_time_range_tool( - status: List[str], + workflow_name: str | None = None, + workflow_phase: str | None = None, + status: List[str] | None = None, started_at: str | None = None, finished_at: str | None = None, from_: int = 0, size: int = 100, ) -> Dict[str, Any]: """ - Retrieve workflow_runs based on their status and time range. + Retrieve workflow_runs with flexible filtering options. Naming Convention Note: The Argo/Kubernetes API uses terminology that can be confusing: - "WorkflowTemplate" (kind="WorkflowTemplate") = workflow (template definition) - "Workflow" (kind="Workflow") = workflow_run (executed instance/run) - This tool allows you to search for workflow_run instances (kind="Workflow") by filtering on their execution status - (e.g., Succeeded, Failed, Running) and optionally by time ranges. This is useful for - monitoring, debugging, and analyzing workflow execution patterns. + This tool supports two main use cases: + 1. Filter by specific workflow name and phase (single workflow) + 2. Filter across all workflows by status list and optional time ranges + + Each workflow_run contains execution details, timing, resource usage, and status information. IMPORTANT: This tool returns only metadata and execution status. It does NOT return full workflow instructions, steps, or transformations. To get complete workflow details including all steps and transformations, use get_workflow_by_id_tool instead. When to use this tool: + - Use when you need to find all workflow_runs of a specific workflow by execution phase - Use when you need to find workflow_runs across multiple workflows filtered by status and time - Use when monitoring workflow execution patterns or analyzing failures - - Use when you need execution metadata for workflow_runs within a specific time window + - Use when you need execution metadata (status, timing, resource usage) for multiple workflow_runs - Do NOT use if you need the complete workflow specification (use get_workflow_by_id_tool instead) Args: - status (List[str]): List of workflow_run phases to filter by. Common values: + workflow_name (str, optional): Name of the workflow (template) as displayed in the UI + (e.g., 'atlan-snowflake-miner-1714638976'). If provided, filters runs for that specific workflow. + This refers to the workflow name, not individual workflow_run IDs. + workflow_phase (str, optional): Phase of the workflow_run. Common values: 'Succeeded', 'Running', 'Failed', 'Error', 'Pending'. + Case-insensitive matching is supported. Required if workflow_name is provided and status is not. + status (List[str], optional): List of workflow_run phases to filter by. Common values: - 'Succeeded': Successfully completed workflow_runs - 'Failed': Workflow_runs that failed - 'Running': Currently executing workflow_runs - 'Error': Workflow_runs that encountered errors - 'Pending': Workflow_runs waiting to start - Case-insensitive matching is supported. Example: ['Succeeded', 'Failed'] + Case-insensitive matching is supported. Required if workflow_name is not provided. + Can also be used with workflow_name (will use first item from list as workflow_phase). + Example: ['Succeeded', 'Failed'] started_at (str, optional): Lower bound on 'status.startedAt' timestamp. Accepts: - Relative time format: 'now-2h', 'now-24h', 'now-7d', 'now-30d' - ISO 8601 format: '2024-01-01T00:00:00Z' Filters workflow_runs that started at or after this time. + Only used when workflow_name is not provided. finished_at (str, optional): Lower bound on 'status.finishedAt' timestamp. Accepts: - Relative time format: 'now-1h', 'now-12h' - ISO 8601 format: '2024-01-01T00:00:00Z' Filters workflow_runs that finished at or after this time. + Only used when workflow_name is not provided. from_ (int, optional): Starting index of the search results for pagination. Defaults to 0. size (int, optional): Maximum number of search results to return. Defaults to 100. Returns: Dict[str, Any]: Dictionary containing: - - runs: List of workflow_run dictionaries matching the criteria. Each workflow_run contains: + - runs: List of workflow_run dictionaries. Each workflow_run contains: Run Metadata: - run_id: Unique identifier for this workflow_run - run_phase: Execution phase (Succeeded, Running, Failed, etc.) @@ -1195,35 +1124,44 @@ def get_workflow_runs_by_status_and_time_range_tool( - error: None if no error occurred, otherwise the error message Examples: - # Get succeeded workflow_runs from the last 2 hours - result = get_workflow_runs_by_status_and_time_range_tool( - status=["Succeeded"], - started_at="now-2h" - ) + # Get succeeded workflow_runs for a specific workflow + result = get_workflow_runs_tool("atlan-snowflake-miner-1714638976", workflow_phase="Succeeded") - # Get failed workflow_runs from the last 24 hours - result = get_workflow_runs_by_status_and_time_range_tool( - status=["Failed"], - started_at="now-24h" - ) + # Get running workflow_runs for a specific workflow + result = get_workflow_runs_tool("atlan-snowflake-miner-1714638976", workflow_phase="Running") + + # Get failed workflow_runs with pagination + result = get_workflow_runs_tool("atlan-snowflake-miner-1714638976", workflow_phase="Failed", from_=0, size=50) + + # Get succeeded workflow_runs from the last 2 hours (across all workflows) + result = get_workflow_runs_tool(status=["Succeeded"], started_at="now-2h") - # Get multiple statuses with both time filters - result = get_workflow_runs_by_status_and_time_range_tool( + # Get failed workflow_runs from the last 24 hours (across all workflows) + result = get_workflow_runs_tool(status=["Failed"], started_at="now-24h") + + # Get multiple statuses with both time filters (across all workflows) + result = get_workflow_runs_tool( status=["Succeeded", "Failed"], started_at="now-7d", finished_at="now-1h" ) - # Get running workflow_runs - result = get_workflow_runs_by_status_and_time_range_tool( - status=["Running"] - ) + # Get running workflow_runs (across all workflows) + result = get_workflow_runs_tool(status=["Running"]) + + # Analyze workflow_run durations + runs = result.get("runs", []) + for run in runs: + if run.get("run_finished_at") and run.get("run_started_at"): + print(f"Workflow run {run['run_id']} took {run.get('run_estimated_duration')}") # Analyze failure rates failed_runs = [r for r in result.get("runs", []) if r.get("run_phase") == "Failed"] print(f"Found {len(failed_runs)} failed workflow_runs in the time range") """ - return get_workflow_runs_by_status_and_time_range( + return get_workflow_runs( + workflow_name=workflow_name, + workflow_phase=workflow_phase, status=status, started_at=started_at, finished_at=finished_at, diff --git a/modelcontextprotocol/tools/__init__.py b/modelcontextprotocol/tools/__init__.py index 0363d3d3..2c9ef278 100644 --- a/modelcontextprotocol/tools/__init__.py +++ b/modelcontextprotocol/tools/__init__.py @@ -22,7 +22,6 @@ get_workflows_by_type, get_workflow_by_id, get_workflow_runs, - get_workflow_runs_by_status_and_time_range, ) __all__ = [ @@ -38,7 +37,6 @@ "get_workflows_by_type", "get_workflow_by_id", "get_workflow_runs", - "get_workflow_runs_by_status_and_time_range", "CertificateStatus", "UpdatableAttribute", "UpdatableAsset", diff --git a/modelcontextprotocol/tools/workflows.py b/modelcontextprotocol/tools/workflows.py index a3a875a8..18fb9589 100644 --- a/modelcontextprotocol/tools/workflows.py +++ b/modelcontextprotocol/tools/workflows.py @@ -298,122 +298,37 @@ def get_workflow_by_id(id: str) -> Dict[str, Any]: def get_workflow_runs( - workflow_name: str, - workflow_phase: Union[AtlanWorkflowPhase, str], - from_: int = 0, - size: int = 100, -) -> Dict[str, Any]: - """ - Retrieve all workflow_runs for a specific workflow and phase. - - Args: - workflow_name (str): Name of the workflow (template) as displayed in the UI (e.g., 'atlan-snowflake-miner-1714638976'). - workflow_phase (Union[AtlanWorkflowPhase, str]): Phase of the workflow_run (e.g., Succeeded, Running, Failed). - from_ (int, optional): Starting index of the search results. Defaults to 0. - size (int, optional): Maximum number of search results to return. Defaults to 100. - - Returns: - Dict[str, Any]: Dictionary containing: - - runs: List of workflow_runs with their details - - total: Total count of workflow_runs - - error: None if no error occurred, otherwise the error message - - Examples: - # Get succeeded workflow_runs - from pyatlan.model.enums import AtlanWorkflowPhase - result = get_workflow_runs("atlan-snowflake-miner-1714638976", AtlanWorkflowPhase.SUCCESS) - - # Get running workflow_runs - result = get_workflow_runs("atlan-snowflake-miner-1714638976", "Running") - """ - logger.info(f"Retrieving workflow runs: workflow_name={workflow_name}, phase={workflow_phase}, from_={from_}, size={size}") - - try: - if not workflow_name: - raise ValueError("workflow_name cannot be empty") - if not workflow_phase: - raise ValueError("workflow_phase cannot be empty") - - # Convert string to AtlanWorkflowPhase enum if needed - if isinstance(workflow_phase, str): - try: - workflow_phase = AtlanWorkflowPhase(workflow_phase) - except ValueError: - # Try case-insensitive match - for phase in AtlanWorkflowPhase: - if phase.value.upper() == workflow_phase.upper(): - workflow_phase = phase - break - else: - raise ValueError(f"Invalid workflow phase: {workflow_phase}") - - client = get_atlan_client() - - # Retrieve workflow runs using the pyatlan SDK - logger.debug(f"Calling client.workflow.get_runs() with workflow_name={workflow_name}, workflow_phase={workflow_phase}") - response = client.workflow.get_runs( - workflow_name=workflow_name, - workflow_phase=workflow_phase, - from_=from_, - size=size, - ) - - logger.info("Successfully retrieved workflow runs") - - # Process the response - runs = [] - total = 0 - if response and response.hits and response.hits.hits: - runs = response.hits.hits - # Handle total - it can be a dict with 'value' key or an object with .value attribute - total_obj = response.hits.total - if total_obj: - if isinstance(total_obj, dict): - total = total_obj.get("value", len(runs)) - elif hasattr(total_obj, "value"): - total = total_obj.value - else: - total = len(runs) - else: - total = len(runs) - - # Extract key run information using _result_to_dict for proper serialization - processed_runs = [] - for run in runs: - processed_run = _result_to_dict(run) - processed_runs.append(processed_run) - - return { - "runs": processed_runs, - "total": total, - "error": None, - } - - except Exception as e: - error_msg = f"Failed to retrieve workflow runs: {str(e)}" - logger.error(error_msg, exc_info=True) - return { - "runs": [], - "total": 0, - "error": error_msg, - } - - -def get_workflow_runs_by_status_and_time_range( - status: Union[List[AtlanWorkflowPhase], List[str]], + workflow_name: Optional[str] = None, + workflow_phase: Optional[Union[AtlanWorkflowPhase, str]] = None, + status: Optional[Union[List[AtlanWorkflowPhase], List[str]]] = None, started_at: Optional[str] = None, finished_at: Optional[str] = None, from_: int = 0, size: int = 100, ) -> Dict[str, Any]: """ - Retrieve workflow_runs based on their status and time range. + Retrieve workflow_runs with flexible filtering options. + + This function supports two main use cases: + 1. Filter by specific workflow name and phase (single workflow) + 2. Filter across all workflows by status list and optional time ranges Args: - status (Union[List[AtlanWorkflowPhase], List[str]]): List of workflow_run phases to filter by - (e.g., ['Succeeded', 'Failed'] or [AtlanWorkflowPhase.SUCCESS, AtlanWorkflowPhase.FAILED]). - started_at (str, optional): Lower bound on 'status.startedAt' (e.g., 'now-2h', '2024-01-01T00:00:00Z'). - finished_at (str, optional): Lower bound on 'status.finishedAt' (e.g., 'now-1h', '2024-01-01T00:00:00Z'). + workflow_name (str, optional): Name of the workflow (template) as displayed in the UI + (e.g., 'atlan-snowflake-miner-1714638976'). If provided, filters runs for that specific workflow. + workflow_phase (Union[AtlanWorkflowPhase, str], optional): Phase of the workflow_run + (e.g., Succeeded, Running, Failed). Required if workflow_name is provided and status is not. + status (Union[List[AtlanWorkflowPhase], List[str]], optional): List of workflow_run phases to filter by + (e.g., ['Succeeded', 'Failed']). Required if workflow_name is not provided. Can also be used + with workflow_name (will use first item from list as workflow_phase). + started_at (str, optional): Lower bound on 'status.startedAt' timestamp. Accepts: + - Relative time format: 'now-2h', 'now-24h', 'now-7d', 'now-30d' + - ISO 8601 format: '2024-01-01T00:00:00Z' + Only used when workflow_name is not provided. + finished_at (str, optional): Lower bound on 'status.finishedAt' timestamp. Accepts: + - Relative time format: 'now-1h', 'now-12h' + - ISO 8601 format: '2024-01-01T00:00:00Z' + Only used when workflow_name is not provided. from_ (int, optional): Starting index of the search results. Defaults to 0. size (int, optional): Maximum number of search results to return. Defaults to 100. @@ -424,68 +339,107 @@ def get_workflow_runs_by_status_and_time_range( - error: None if no error occurred, otherwise the error message Examples: - # Get succeeded workflow_runs from the last 2 hours + # Get succeeded workflow_runs for a specific workflow from pyatlan.model.enums import AtlanWorkflowPhase - result = get_workflow_runs_by_status_and_time_range( - status=[AtlanWorkflowPhase.SUCCESS], - started_at="now-2h" - ) + result = get_workflow_runs("atlan-snowflake-miner-1714638976", workflow_phase=AtlanWorkflowPhase.SUCCESS) - # Get failed workflow_runs with both time filters - result = get_workflow_runs_by_status_and_time_range( + # Get running workflow_runs for a specific workflow + result = get_workflow_runs("atlan-snowflake-miner-1714638976", workflow_phase="Running") + + # Get succeeded workflow_runs from the last 2 hours (across all workflows) + result = get_workflow_runs(status=["Succeeded"], started_at="now-2h") + + # Get failed workflow_runs with both time filters (across all workflows) + result = get_workflow_runs( status=["Failed"], started_at="now-24h", finished_at="now-1h" ) - # Get multiple statuses - result = get_workflow_runs_by_status_and_time_range( - status=["Succeeded", "Failed"], - started_at="now-7d" - ) + # Get multiple statuses (across all workflows) + result = get_workflow_runs(status=["Succeeded", "Failed"], started_at="now-7d") """ logger.info( - f"Retrieving workflow runs by status and time range: status={status}, " - f"started_at={started_at}, finished_at={finished_at}, from_={from_}, size={size}" + f"Retrieving workflow runs: workflow_name={workflow_name}, workflow_phase={workflow_phase}, " + f"status={status}, started_at={started_at}, finished_at={finished_at}, from_={from_}, size={size}" ) try: - if not status: - raise ValueError("status cannot be empty") + client = get_atlan_client() - # Convert string statuses to AtlanWorkflowPhase enums if needed - status_list = [] - for s in status: - if isinstance(s, str): + # Route based on whether workflow_name is provided + if workflow_name: + # Use get_runs() API - single workflow, single phase + if not workflow_phase and not status: + raise ValueError("Either workflow_phase or status must be provided when workflow_name is specified") + + # Use workflow_phase if provided, otherwise use first item from status list + phase_to_use = workflow_phase + if not phase_to_use and status: + phase_to_use = status[0] if isinstance(status, list) and len(status) > 0 else None + + if not phase_to_use: + raise ValueError("workflow_phase or status must be provided when workflow_name is specified") + + # Convert string to AtlanWorkflowPhase enum if needed + if isinstance(phase_to_use, str): try: - status_list.append(AtlanWorkflowPhase(s)) + phase_to_use = AtlanWorkflowPhase(phase_to_use) except ValueError: # Try case-insensitive match for phase in AtlanWorkflowPhase: - if phase.value.upper() == s.upper(): - status_list.append(phase) + if phase.value.upper() == phase_to_use.upper(): + phase_to_use = phase break else: - raise ValueError(f"Invalid workflow phase: {s}") - else: - status_list.append(s) - - client = get_atlan_client() - - # Retrieve workflow runs using the pyatlan SDK - logger.debug( - f"Calling client.workflow.find_runs_by_status_and_time_range() with " - f"status={status_list}, started_at={started_at}, finished_at={finished_at}" - ) - response = client.workflow.find_runs_by_status_and_time_range( - status=status_list, - started_at=started_at, - finished_at=finished_at, - from_=from_, - size=size, - ) - - logger.info("Successfully retrieved workflow runs by status and time range") + raise ValueError(f"Invalid workflow phase: {phase_to_use}") + + # Retrieve workflow runs using the pyatlan SDK + logger.debug(f"Calling client.workflow.get_runs() with workflow_name={workflow_name}, workflow_phase={phase_to_use}") + response = client.workflow.get_runs( + workflow_name=workflow_name, + workflow_phase=phase_to_use, + from_=from_, + size=size, + ) + + logger.info("Successfully retrieved workflow runs") + else: + # Use find_runs_by_status_and_time_range() API - all workflows, status list, time ranges + if not status: + raise ValueError("status must be provided when workflow_name is not specified") + + # Convert string statuses to AtlanWorkflowPhase enums if needed + status_list = [] + for s in status: + if isinstance(s, str): + try: + status_list.append(AtlanWorkflowPhase(s)) + except ValueError: + # Try case-insensitive match + for phase in AtlanWorkflowPhase: + if phase.value.upper() == s.upper(): + status_list.append(phase) + break + else: + raise ValueError(f"Invalid workflow phase: {s}") + else: + status_list.append(s) + + # Retrieve workflow runs using the pyatlan SDK + logger.debug( + f"Calling client.workflow.find_runs_by_status_and_time_range() with " + f"status={status_list}, started_at={started_at}, finished_at={finished_at}" + ) + response = client.workflow.find_runs_by_status_and_time_range( + status=status_list, + started_at=started_at, + finished_at=finished_at, + from_=from_, + size=size, + ) + + logger.info("Successfully retrieved workflow runs by status and time range") # Process the response runs = [] @@ -517,10 +471,11 @@ def get_workflow_runs_by_status_and_time_range( } except Exception as e: - error_msg = f"Failed to retrieve workflow runs by status and time range: {str(e)}" + error_msg = f"Failed to retrieve workflow runs: {str(e)}" logger.error(error_msg, exc_info=True) return { "runs": [], "total": 0, "error": error_msg, - } \ No newline at end of file + } + From b4b12f7b59b90552d1007b1891b9fb320b389365 Mon Sep 17 00:00:00 2001 From: Christopher Tin Date: Wed, 26 Nov 2025 08:19:43 -0800 Subject: [PATCH 04/11] Consolidate get_workflows_by_type and get_workflows_by_id into a single tool get_workflows --- modelcontextprotocol/server.py | 132 +++++++++------------- modelcontextprotocol/tools/__init__.py | 6 +- modelcontextprotocol/tools/workflows.py | 142 ++++++++++++------------ 3 files changed, 124 insertions(+), 156 deletions(-) diff --git a/modelcontextprotocol/server.py b/modelcontextprotocol/server.py index ad71051c..1e586cb5 100644 --- a/modelcontextprotocol/server.py +++ b/modelcontextprotocol/server.py @@ -13,8 +13,7 @@ create_glossary_assets, create_glossary_term_assets, get_workflow_package_names, - get_workflows_by_type, - get_workflow_by_id, + get_workflows, get_workflow_runs, UpdatableAttribute, CertificateStatus, @@ -928,91 +927,49 @@ def get_workflow_package_names_tool() -> List[str]: @mcp.tool() -def get_workflows_by_type_tool(workflow_package_name: str, max_results: int = 10) -> Dict[str, Any]: - """ - Retrieve workflows (WorkflowTemplate) by workflow package name. - - Naming Convention Note: The Argo/Kubernetes API uses terminology that can be confusing: - - "WorkflowTemplate" (kind="WorkflowTemplate") = workflow (template definition) - - "Workflow" (kind="Workflow") = workflow_run (executed instance/run) - - IMPORTANT: This tool returns only basic metadata. It does NOT return full workflow - instructions, steps, or transformations. To get complete workflow details including all steps and - transformations, use get_workflow_by_id_tool instead. - - Note: This tool only returns workflows (WorkflowTemplate), not workflow_runs. - To get workflow_run information, use functions that start with get_workflow_run. - - When to use this tool: - - Use when you need to list multiple workflows by package type - - Use when you only need metadata (package info, certification status) without full workflow details - - Do NOT use if you need the complete workflow specification with all steps and transformations - - Do NOT use if you need workflow_run (execution) information - - Args: - workflow_package_name (str): The workflow package name (e.g., 'atlan-snowflake', 'atlan-bigquery', 'SNOWFLAKE'). - Can be the full package name like 'atlan-snowflake' or the enum name like 'SNOWFLAKE'. - max_results (int, optional): Maximum number of workflows to return. Defaults to 10. - - Returns: - Dict[str, Any]: Dictionary containing: - - workflows: List of workflow (WorkflowTemplate) dictionaries. Each workflow contains: - - template_name: Name of the template - - package_name, package_version: Package information - - source_system, source_category, workflow_type: Source information - - certified, verified: Certification status - - creator_email, creator_username: Creator information - - modifier_email, modifier_username: Last modifier information - - creation_timestamp: When template was created - - package_author, package_description, package_repository: Package metadata - - total: Total count of workflows found - - Examples: - # Get Snowflake workflows - result = get_workflows_by_type_tool("atlan-snowflake") - - # Get BigQuery workflows with custom limit - result = get_workflows_by_type_tool("atlan-bigquery", max_results=50) - - # Get workflows using enum name - result = get_workflows_by_type_tool("SNOWFLAKE") - """ - return get_workflows_by_type(workflow_package_name=workflow_package_name, max_results=max_results) - - -@mcp.tool() -def get_workflow_by_id_tool(id: str) -> Dict[str, Any]: +def get_workflows_tool( + id: str | None = None, + workflow_package_name: str | None = None, + max_results: int = 10, +) -> Dict[str, Any]: """ - Retrieve a specific workflow (WorkflowTemplate) by its ID. + Retrieve workflows (WorkflowTemplate) by ID or by package type. Naming Convention Note: The Argo/Kubernetes API uses terminology that can be confusing: - "WorkflowTemplate" (kind="WorkflowTemplate") = workflow (template definition) - "Workflow" (kind="Workflow") = workflow_run (executed instance/run) - IMPORTANT: This is the ONLY tool that returns full workflow instructions with all steps and transformations. - All other workflow tools (get_workflows_by_type_tool, get_workflow_runs_tool, etc.) return only metadata - and execution status, but NOT the complete workflow definition, steps, or transformation details. + This tool supports two main use cases: + 1. Get a specific workflow by ID (returns full workflow details including steps and spec) + 2. Get multiple workflows by package type (returns metadata only, no full details) - This tool uses include_dag=True internally, which means it returns the complete workflow specification - including the workflow DAG (directed acyclic graph), all steps, transformations, and execution details. + IMPORTANT: When getting by ID, this tool returns full workflow instructions with all steps and transformations. + When getting by type, this tool returns only basic metadata to avoid large responses. Note: This tool only returns workflows (WorkflowTemplate), not workflow_runs. - To get workflow_run information, use functions that start with get_workflow_run. + To get workflow_run information, use get_workflow_runs_tool. When to use this tool: - - Use when you need the COMPLETE workflow definition with all steps and transformations - - Use when you need to understand the full workflow execution flow - - Use when you need workflow_spec and workflow_steps data - - Do NOT use if you only need basic metadata (use other workflow tools instead) + - Use when you need to get a specific workflow by ID with full details (steps, spec, transformations) + - Use when you need to list multiple workflows by package type (metadata only) + - Use when you need to understand the full workflow execution flow (by ID only) - Do NOT use if you need workflow_run (execution) information Args: - id (str): The unique identifier (ID) of the workflow (e.g., 'atlan-snowflake-miner-1714638976'). + id (str, optional): The unique identifier (ID) of the workflow (e.g., 'atlan-snowflake-miner-1714638976'). + If provided, returns a single workflow with full details (workflow_steps and workflow_spec). + Either id or workflow_package_name must be provided, but not both. + workflow_package_name (str, optional): The workflow package name (e.g., 'atlan-snowflake', 'atlan-bigquery', 'SNOWFLAKE'). + Can be the full package name like 'atlan-snowflake' or the enum name like 'SNOWFLAKE'. + If provided, returns list of workflows with metadata only (no workflow_steps or workflow_spec). + Either id or workflow_package_name must be provided, but not both. + max_results (int, optional): Maximum number of workflows to return when getting by type. Defaults to 10. + Only used when workflow_package_name is provided. Returns: Dict[str, Any]: Dictionary containing: - - workflow: The workflow (WorkflowTemplate) dictionary with comprehensive metadata, or None if not found. - The workflow dictionary contains: + - workflows: List of workflow (WorkflowTemplate) dictionaries. When getting by ID, contains single item. + Each workflow contains: - template_name: Name of the template - package_name, package_version: Package information - source_system, source_category, workflow_type: Source information @@ -1021,19 +978,32 @@ def get_workflow_by_id_tool(id: str) -> Dict[str, Any]: - modifier_email, modifier_username: Last modifier information - creation_timestamp: When template was created - package_author, package_description, package_repository: Package metadata - - workflow_steps: Workflow steps (when include_dag=True) - - workflow_spec: Full workflow specification (when include_dag=True) + - workflow_steps: Workflow steps (only when getting by ID) + - workflow_spec: Full workflow specification (only when getting by ID) + - total: Total count of workflows (1 when getting by ID, actual count when getting by type) - error: None if no error occurred, otherwise the error message Examples: - # Get a specific workflow by ID to see full workflow instructions - result = get_workflow_by_id_tool("atlan-snowflake-miner-1714638976") - - # Access full workflow steps and transformations - workflow = result.get("workflow") - # The workflow object contains complete workflow definition with workflow_steps and workflow_spec + # Get a specific workflow by ID (full details with steps and spec) + result = get_workflows_tool(id="atlan-snowflake-miner-1714638976") + workflow = result.get("workflows", [])[0] # Single workflow in list + # workflow contains complete workflow definition with workflow_steps and workflow_spec + + # Get Snowflake workflows by type (metadata only) + result = get_workflows_tool(workflow_package_name="atlan-snowflake") + workflows = result.get("workflows", []) # List of workflows + + # Get BigQuery workflows with custom limit + result = get_workflows_tool(workflow_package_name="atlan-bigquery", max_results=50) + + # Get workflows using enum name + result = get_workflows_tool(workflow_package_name="SNOWFLAKE") """ - return get_workflow_by_id(id=id) + return get_workflows( + id=id, + workflow_package_name=workflow_package_name, + max_results=max_results, + ) @mcp.tool() @@ -1061,14 +1031,14 @@ def get_workflow_runs_tool( IMPORTANT: This tool returns only metadata and execution status. It does NOT return full workflow instructions, steps, or transformations. To get complete workflow details including all steps and - transformations, use get_workflow_by_id_tool instead. + transformations, use get_workflows_tool with an id parameter instead. When to use this tool: - Use when you need to find all workflow_runs of a specific workflow by execution phase - Use when you need to find workflow_runs across multiple workflows filtered by status and time - Use when monitoring workflow execution patterns or analyzing failures - Use when you need execution metadata (status, timing, resource usage) for multiple workflow_runs - - Do NOT use if you need the complete workflow specification (use get_workflow_by_id_tool instead) + - Do NOT use if you need the complete workflow specification (use get_workflows_tool with id instead) Args: workflow_name (str, optional): Name of the workflow (template) as displayed in the UI diff --git a/modelcontextprotocol/tools/__init__.py b/modelcontextprotocol/tools/__init__.py index 2c9ef278..4bc6a809 100644 --- a/modelcontextprotocol/tools/__init__.py +++ b/modelcontextprotocol/tools/__init__.py @@ -19,8 +19,7 @@ ) from .workflows import ( get_workflow_package_names, - get_workflows_by_type, - get_workflow_by_id, + get_workflows, get_workflow_runs, ) @@ -34,8 +33,7 @@ "create_glossary_assets", "create_glossary_term_assets", "get_workflow_package_names", - "get_workflows_by_type", - "get_workflow_by_id", + "get_workflows", "get_workflow_runs", "CertificateStatus", "UpdatableAttribute", diff --git a/modelcontextprotocol/tools/workflows.py b/modelcontextprotocol/tools/workflows.py index 18fb9589..95f7a5b7 100644 --- a/modelcontextprotocol/tools/workflows.py +++ b/modelcontextprotocol/tools/workflows.py @@ -189,110 +189,110 @@ def get_workflow_package_names() -> List[str]: """ return [pkg.value for pkg in WorkflowPackage] -def get_workflows_by_type(workflow_package_name: Union[WorkflowPackage, str], max_results: int = 10) -> Dict[str, Any]: +def get_workflows( + id: Optional[str] = None, + workflow_package_name: Optional[Union[WorkflowPackage, str]] = None, + max_results: int = 10, +) -> Dict[str, Any]: """ - Retrieve workflows (WorkflowTemplate) by type (workflow package name). + Retrieve workflows (WorkflowTemplate) by ID or by package type. + + This function supports two main use cases: + 1. Get a specific workflow by ID (returns full workflow details including steps and spec) + 2. Get multiple workflows by package type (returns metadata only, no full details) Args: - workflow_package_name (Union[WorkflowPackage, str]): Workflow package type (e.g., WorkflowPackage.SNOWFLAKE or "atlan-snowflake"). - max_results (int, optional): Maximum number of workflows to return. Defaults to 10. + id (str, optional): The unique identifier (ID) of the workflow (e.g., 'atlan-snowflake-miner-1714638976'). + If provided, returns a single workflow with full details (workflow_steps and workflow_spec). + workflow_package_name (Union[WorkflowPackage, str], optional): Workflow package type + (e.g., WorkflowPackage.SNOWFLAKE or "atlan-snowflake"). If provided, returns list of workflows + with metadata only (no workflow_steps or workflow_spec). + max_results (int, optional): Maximum number of workflows to return when getting by type. Defaults to 10. + Only used when workflow_package_name is provided. Returns: Dict[str, Any]: Dictionary containing: - - workflows: List of workflows (WorkflowTemplate) with their configurations - - total: Total count of workflows + - workflows: List of workflow (WorkflowTemplate) dictionaries. When getting by ID, contains single item. + - total: Total count of workflows (1 when getting by ID, actual count when getting by type) - error: None if no error occurred, otherwise the error message Examples: - # Get Snowflake workflows + # Get a specific workflow by ID (full details) + result = get_workflows(id="atlan-snowflake-miner-1714638976") + # Returns: {"workflows": [{...full details with workflow_steps and workflow_spec...}], "total": 1, "error": None} + + # Get Snowflake workflows by type (metadata only) from pyatlan.model.enums import WorkflowPackage - result = get_workflows_by_type(WorkflowPackage.SNOWFLAKE) + result = get_workflows(workflow_package_name=WorkflowPackage.SNOWFLAKE) # Get workflows with custom limit - result = get_workflows_by_type(WorkflowPackage.BIGQUERY, max_results=50) + result = get_workflows(workflow_package_name="atlan-bigquery", max_results=50) """ - logger.info(f"Starting workflow retrieval with workflow_package_name={workflow_package_name}, max_results={max_results}") + # Validate exactly one parameter is provided + if not id and not workflow_package_name: + raise ValueError("Either id or workflow_package_name must be provided") + if id and workflow_package_name: + raise ValueError("Cannot provide both id and workflow_package_name") try: client = get_atlan_client() - # Retrieve workflows using the pyatlan SDK - logger.debug(f"Calling client.workflow.find_by_type() with workflow_package_name={workflow_package_name}") - results = client.workflow.find_by_type(prefix=workflow_package_name, max_results=max_results) - - logger.info(f"Successfully retrieved workflows") - - # Process the response - workflows = results if results else [] - total = len(workflows) if workflows else 0 + if id is not None: + # Get single workflow by ID - always include full details + logger.info(f"Retrieving workflow with ID: {id}") + + if not id: + raise ValueError("id cannot be empty") - # Extract key workflow information - processed_workflows = [_result_to_dict(workflow) for workflow in workflows] + logger.debug(f"Calling client.workflow.find_by_id() with id={id}") + workflow = client.workflow.find_by_id(id=id) - return { - "workflows": processed_workflows, - "total": total, - "error": None, - } - - except Exception as e: - error_msg = f"Failed to retrieve workflows: {str(e)}" - logger.error(error_msg, exc_info=True) - return { - "workflows": [], - "total": 0, - "error": error_msg, - } + if workflow is None: + logger.warning(f"Workflow with ID '{id}' not found") + return { + "workflows": [], + "total": 0, + "error": f"Workflow with ID '{id}' not found", + } + logger.info(f"Successfully retrieved workflow with ID: {id}") + # Always include full details (include_dag=True) when getting by ID + processed_workflow = _result_to_dict(workflow, include_dag=True) -def get_workflow_by_id(id: str) -> Dict[str, Any]: - """ - Retrieve a specific workflow (WorkflowTemplate) by its ID. - - Args: - id (str): The unique identifier (ID) of the workflow (e.g., 'atlan-snowflake-miner-1714638976'). + return { + "workflows": [processed_workflow] if processed_workflow else [], + "total": 1 if processed_workflow else 0, + "error": None, + } - Returns: - Dict[str, Any]: Dictionary containing: - - workflow: The workflow (WorkflowTemplate) object with its configuration, or None if not found - - error: None if no error occurred, otherwise the error message + elif workflow_package_name is not None: + # Get workflows by type - always metadata only + logger.info(f"Retrieving workflows with workflow_package_name={workflow_package_name}, max_results={max_results}") - Examples: - # Get a specific workflow by ID - result = get_workflow_by_id("atlan-snowflake-miner-1714638976") - """ - logger.info(f"Retrieving workflow with ID: {id}") + logger.debug(f"Calling client.workflow.find_by_type() with workflow_package_name={workflow_package_name}") + results = client.workflow.find_by_type(prefix=workflow_package_name, max_results=max_results) - try: - if not id: - raise ValueError("id cannot be empty") + logger.info(f"Successfully retrieved workflows") - client = get_atlan_client() + # Process the response + workflows = results if results else [] + total = len(workflows) if workflows else 0 - # Retrieve workflow using the pyatlan SDK - logger.debug(f"Calling client.workflow.find_by_id() with id={id}") - workflow = client.workflow.find_by_id(id=id) + # Always metadata only (include_dag=False) when getting by type + processed_workflows = [_result_to_dict(workflow, include_dag=False) for workflow in workflows] - if workflow is None: - logger.warning(f"Workflow with ID '{id}' not found") return { - "workflow": None, - "error": f"Workflow with ID '{id}' not found", + "workflows": processed_workflows, + "total": total, + "error": None, } - logger.info(f"Successfully retrieved workflow with ID: {id}") - processed_workflow = _result_to_dict(workflow, include_dag=True) - - return { - "workflow": processed_workflow, - "error": None, - } - except Exception as e: - error_msg = f"Failed to retrieve workflow with ID '{id}': {str(e)}" + error_msg = f"Failed to retrieve workflows: {str(e)}" logger.error(error_msg, exc_info=True) return { - "workflow": None, + "workflows": [], + "total": 0, "error": error_msg, } From 870a8646e6b3466b0fb064fed1a6f29030da69bd Mon Sep 17 00:00:00 2001 From: Christopher Tin Date: Wed, 26 Nov 2025 09:28:04 -0800 Subject: [PATCH 05/11] Fix to LLM instructions for get_workflow_runs_tool to use the correct parameters workflow_phase vs status --- modelcontextprotocol/server.py | 34 +++++++++++++++++++++------------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/modelcontextprotocol/server.py b/modelcontextprotocol/server.py index 1e586cb5..cacf747b 100644 --- a/modelcontextprotocol/server.py +++ b/modelcontextprotocol/server.py @@ -1035,7 +1035,9 @@ def get_workflow_runs_tool( When to use this tool: - Use when you need to find all workflow_runs of a specific workflow by execution phase + → Provide workflow_name AND workflow_phase (do NOT use status) - Use when you need to find workflow_runs across multiple workflows filtered by status and time + → Provide status (and optionally started_at/finished_at), do NOT provide workflow_name or workflow_phase - Use when monitoring workflow execution patterns or analyzing failures - Use when you need execution metadata (status, timing, resource usage) for multiple workflow_runs - Do NOT use if you need the complete workflow specification (use get_workflows_tool with id instead) @@ -1045,16 +1047,22 @@ def get_workflow_runs_tool( (e.g., 'atlan-snowflake-miner-1714638976'). If provided, filters runs for that specific workflow. This refers to the workflow name, not individual workflow_run IDs. workflow_phase (str, optional): Phase of the workflow_run. Common values: 'Succeeded', 'Running', 'Failed', 'Error', 'Pending'. - Case-insensitive matching is supported. Required if workflow_name is provided and status is not. + Case-insensitive matching is supported. + REQUIRED when workflow_name is provided. DO NOT use status when workflow_name is provided. status (List[str], optional): List of workflow_run phases to filter by. Common values: - 'Succeeded': Successfully completed workflow_runs - 'Failed': Workflow_runs that failed - 'Running': Currently executing workflow_runs - 'Error': Workflow_runs that encountered errors - 'Pending': Workflow_runs waiting to start - Case-insensitive matching is supported. Required if workflow_name is not provided. - Can also be used with workflow_name (will use first item from list as workflow_phase). + Case-insensitive matching is supported. + REQUIRED when workflow_name is NOT provided. DO NOT use status when workflow_name is provided - use workflow_phase instead. Example: ['Succeeded', 'Failed'] + + Parameter Usage Rules: + - If workflow_name is provided: MUST use workflow_phase (single phase string). Do NOT use status. + - If workflow_name is NOT provided: MUST use status (list of phases). Do NOT use workflow_phase. + - These are mutually exclusive usage patterns - do not mix them. started_at (str, optional): Lower bound on 'status.startedAt' timestamp. Accepts: - Relative time format: 'now-2h', 'now-24h', 'now-7d', 'now-30d' - ISO 8601 format: '2024-01-01T00:00:00Z' @@ -1094,29 +1102,29 @@ def get_workflow_runs_tool( - error: None if no error occurred, otherwise the error message Examples: - # Get succeeded workflow_runs for a specific workflow - result = get_workflow_runs_tool("atlan-snowflake-miner-1714638976", workflow_phase="Succeeded") + # CORRECT: Get succeeded workflow_runs for a specific workflow (use workflow_phase, NOT status) + result = get_workflow_runs_tool(workflow_name="atlan-snowflake-miner-1714638976", workflow_phase="Succeeded") - # Get running workflow_runs for a specific workflow - result = get_workflow_runs_tool("atlan-snowflake-miner-1714638976", workflow_phase="Running") + # CORRECT: Get running workflow_runs for a specific workflow (use workflow_phase, NOT status) + result = get_workflow_runs_tool(workflow_name="atlan-snowflake-miner-1714638976", workflow_phase="Running") - # Get failed workflow_runs with pagination - result = get_workflow_runs_tool("atlan-snowflake-miner-1714638976", workflow_phase="Failed", from_=0, size=50) + # CORRECT: Get failed workflow_runs with pagination (use workflow_phase, NOT status) + result = get_workflow_runs_tool(workflow_name="atlan-snowflake-miner-1714638976", workflow_phase="Failed", from_=0, size=50) - # Get succeeded workflow_runs from the last 2 hours (across all workflows) + # CORRECT: Get succeeded workflow_runs from the last 2 hours (across all workflows - use status, NOT workflow_phase) result = get_workflow_runs_tool(status=["Succeeded"], started_at="now-2h") - # Get failed workflow_runs from the last 24 hours (across all workflows) + # CORRECT: Get failed workflow_runs from the last 24 hours (across all workflows - use status, NOT workflow_phase) result = get_workflow_runs_tool(status=["Failed"], started_at="now-24h") - # Get multiple statuses with both time filters (across all workflows) + # CORRECT: Get multiple statuses with both time filters (across all workflows - use status, NOT workflow_phase) result = get_workflow_runs_tool( status=["Succeeded", "Failed"], started_at="now-7d", finished_at="now-1h" ) - # Get running workflow_runs (across all workflows) + # CORRECT: Get running workflow_runs (across all workflows - use status, NOT workflow_phase) result = get_workflow_runs_tool(status=["Running"]) # Analyze workflow_run durations From ef286c41c5a234a3d990c27f81cb703d5e718e27 Mon Sep 17 00:00:00 2001 From: Christopher Tin Date: Wed, 3 Dec 2025 18:08:34 -0800 Subject: [PATCH 06/11] Update get_workflow_runs to use parse_list_parameter in parsing incoming list --- modelcontextprotocol/server.py | 30 ++++++++++++++++++++++-------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/modelcontextprotocol/server.py b/modelcontextprotocol/server.py index cacf747b..1fea29e6 100644 --- a/modelcontextprotocol/server.py +++ b/modelcontextprotocol/server.py @@ -1010,7 +1010,7 @@ def get_workflows_tool( def get_workflow_runs_tool( workflow_name: str | None = None, workflow_phase: str | None = None, - status: List[str] | None = None, + status: str | None = None, # This is technically a list but we get the input as a string so we parse it into a list started_at: str | None = None, finished_at: str | None = None, from_: int = 0, @@ -1049,15 +1049,19 @@ def get_workflow_runs_tool( workflow_phase (str, optional): Phase of the workflow_run. Common values: 'Succeeded', 'Running', 'Failed', 'Error', 'Pending'. Case-insensitive matching is supported. REQUIRED when workflow_name is provided. DO NOT use status when workflow_name is provided. - status (List[str], optional): List of workflow_run phases to filter by. Common values: - - 'Succeeded': Successfully completed workflow_runs - - 'Failed': Workflow_runs that failed - - 'Running': Currently executing workflow_runs - - 'Error': Workflow_runs that encountered errors - - 'Pending': Workflow_runs waiting to start + status (List[str], optional): Array/list of workflow_run phases to filter by. + IMPORTANT: Must be an array/list type, NOT a JSON string. + Correct format: ["Succeeded", "Failed"] (array/list) + Incorrect format: '["Succeeded", "Failed"]' (string - will be rejected by schema validation) + Common values: + - 'Succeeded': Successfully completed workflow_runs + - 'Failed': Workflow_runs that failed + - 'Running': Currently executing workflow_runs + - 'Error': Workflow_runs that encountered errors + - 'Pending': Workflow_runs waiting to start Case-insensitive matching is supported. REQUIRED when workflow_name is NOT provided. DO NOT use status when workflow_name is provided - use workflow_phase instead. - Example: ['Succeeded', 'Failed'] + Example: ["Succeeded", "Failed"] # Array/list, not a string Parameter Usage Rules: - If workflow_name is provided: MUST use workflow_phase (single phase string). Do NOT use status. @@ -1137,6 +1141,16 @@ def get_workflow_runs_tool( failed_runs = [r for r in result.get("runs", []) if r.get("run_phase") == "Failed"] print(f"Found {len(failed_runs)} failed workflow_runs in the time range") """ + try: + # Parse list parameter if it comes as a JSON string (common with MCP clients) + status = parse_list_parameter(status) + except (json.JSONDecodeError, ValueError) as e: + return { + "runs": [], + "total": 0, + "error": f"Parameter parsing error for status: {str(e)}" + } + return get_workflow_runs( workflow_name=workflow_name, workflow_phase=workflow_phase, From 1b0d4da748393f2796d89e558853085252afe710 Mon Sep 17 00:00:00 2001 From: Christopher Tin Date: Wed, 3 Dec 2025 18:17:40 -0800 Subject: [PATCH 07/11] pre-commit check fixes --- modelcontextprotocol/server.py | 25 ++-- modelcontextprotocol/tools/workflows.py | 179 ++++++++++++++---------- 2 files changed, 115 insertions(+), 89 deletions(-) diff --git a/modelcontextprotocol/server.py b/modelcontextprotocol/server.py index 9bab7cec..c3521a20 100644 --- a/modelcontextprotocol/server.py +++ b/modelcontextprotocol/server.py @@ -1013,7 +1013,8 @@ def get_workflows_tool( def get_workflow_runs_tool( workflow_name: str | None = None, workflow_phase: str | None = None, - status: str | None = None, # This is technically a list but we get the input as a string so we parse it into a list + status: str + | None = None, # This is technically a list but we get the input as a string so we parse it into a list started_at: str | None = None, finished_at: str | None = None, from_: int = 0, @@ -1046,14 +1047,14 @@ def get_workflow_runs_tool( - Do NOT use if you need the complete workflow specification (use get_workflows_tool with id instead) Args: - workflow_name (str, optional): Name of the workflow (template) as displayed in the UI + workflow_name (str, optional): Name of the workflow (template) as displayed in the UI (e.g., 'atlan-snowflake-miner-1714638976'). If provided, filters runs for that specific workflow. This refers to the workflow name, not individual workflow_run IDs. workflow_phase (str, optional): Phase of the workflow_run. Common values: 'Succeeded', 'Running', 'Failed', 'Error', 'Pending'. - Case-insensitive matching is supported. + Case-insensitive matching is supported. REQUIRED when workflow_name is provided. DO NOT use status when workflow_name is provided. - status (List[str], optional): Array/list of workflow_run phases to filter by. - IMPORTANT: Must be an array/list type, NOT a JSON string. + status (List[str], optional): Array/list of workflow_run phases to filter by. + IMPORTANT: Must be an array/list type, NOT a JSON string. Correct format: ["Succeeded", "Failed"] (array/list) Incorrect format: '["Succeeded", "Failed"]' (string - will be rejected by schema validation) Common values: @@ -1062,10 +1063,10 @@ def get_workflow_runs_tool( - 'Running': Currently executing workflow_runs - 'Error': Workflow_runs that encountered errors - 'Pending': Workflow_runs waiting to start - Case-insensitive matching is supported. + Case-insensitive matching is supported. REQUIRED when workflow_name is NOT provided. DO NOT use status when workflow_name is provided - use workflow_phase instead. Example: ["Succeeded", "Failed"] # Array/list, not a string - + Parameter Usage Rules: - If workflow_name is provided: MUST use workflow_phase (single phase string). Do NOT use status. - If workflow_name is NOT provided: MUST use status (list of phases). Do NOT use workflow_phase. @@ -1095,7 +1096,7 @@ def get_workflow_runs_tool( - run_progress: Progress indicator (e.g., "1/3") - run_cpu_usage: CPU resource usage duration - run_memory_usage: Memory resource usage duration - + Workflow Metadata: - workflow_id: Reference to the workflow (template) used - workflow_package_name: Package identifier @@ -1133,13 +1134,13 @@ def get_workflow_runs_tool( # CORRECT: Get running workflow_runs (across all workflows - use status, NOT workflow_phase) result = get_workflow_runs_tool(status=["Running"]) - + # Analyze workflow_run durations runs = result.get("runs", []) for run in runs: if run.get("run_finished_at") and run.get("run_started_at"): print(f"Workflow run {run['run_id']} took {run.get('run_estimated_duration')}") - + # Analyze failure rates failed_runs = [r for r in result.get("runs", []) if r.get("run_phase") == "Failed"] print(f"Found {len(failed_runs)} failed workflow_runs in the time range") @@ -1151,9 +1152,9 @@ def get_workflow_runs_tool( return { "runs": [], "total": 0, - "error": f"Parameter parsing error for status: {str(e)}" + "error": f"Parameter parsing error for status: {str(e)}", } - + return get_workflow_runs( workflow_name=workflow_name, workflow_phase=workflow_phase, diff --git a/modelcontextprotocol/tools/workflows.py b/modelcontextprotocol/tools/workflows.py index 95f7a5b7..89b6c762 100644 --- a/modelcontextprotocol/tools/workflows.py +++ b/modelcontextprotocol/tools/workflows.py @@ -17,11 +17,6 @@ from client import get_atlan_client from pyatlan.model.enums import WorkflowPackage, AtlanWorkflowPhase -from pyatlan.model.workflow import Workflow, WorkflowSchedule - -# from dotenv import load_dotenv - -# load_dotenv(dotenv_path="/Users/christopher.tin/Repos/.env") logger = logging.getLogger(__name__) @@ -33,7 +28,7 @@ def _result_to_dict(result: Any, include_dag: bool = False) -> Optional[Dict[str Note: There is a naming confusion in the Argo/Kubernetes API: - "WorkflowTemplate" (kind="WorkflowTemplate") = workflow (template definition) - "Workflow" (kind="Workflow") = workflow_run (executed instance/run) - + This function handles both types and returns standardized metadata dictionaries. Args: @@ -46,7 +41,7 @@ def _result_to_dict(result: Any, include_dag: bool = False) -> Optional[Dict[str """ if result is None: return {} - + try: dict_repr = result.dict(by_alias=True, exclude_unset=True) except (AttributeError, TypeError) as e: @@ -55,7 +50,7 @@ def _result_to_dict(result: Any, include_dag: bool = False) -> Optional[Dict[str source = dict_repr.get("_source", None) if source is None: return dict_repr - + source_type = source.get("kind", None) if source_type == "Workflow": return extract_workflow_metadata(dict_repr) @@ -66,7 +61,9 @@ def _result_to_dict(result: Any, include_dag: bool = False) -> Optional[Dict[str return None # Unsupported workflow type -def extract_workflow_template_metadata(dict_repr: Dict[str, Any], include_dag: bool = False) -> Dict[str, Any]: +def extract_workflow_template_metadata( + dict_repr: Dict[str, Any], include_dag: bool = False +) -> Dict[str, Any]: """ Extract useful metadata from a workflow (WorkflowTemplate). @@ -87,38 +84,43 @@ def extract_workflow_template_metadata(dict_repr: Dict[str, Any], include_dag: b - If include_dag=True: workflow_steps and workflow_spec """ source = dict_repr.get("_source") or {} - metadata = source.get('metadata') or {} - labels = metadata.get('labels') or {} - annotations = metadata.get('annotations') or {} + metadata = source.get("metadata") or {} + labels = metadata.get("labels") or {} + annotations = metadata.get("annotations") or {} base_output = { - 'template_name': metadata.get('name'), - 'package_name': annotations.get('package.argoproj.io/name'), - 'package_version': labels.get('package.argoproj.io/version'), - 'source_system': labels.get('orchestration.atlan.com/source'), - 'source_category': labels.get('orchestration.atlan.com/sourceCategory'), - 'workflow_type': labels.get('orchestration.atlan.com/type'), - 'certified': labels.get('orchestration.atlan.com/certified') == 'true', - 'verified': labels.get('orchestration.atlan.com/verified') == 'true', - 'creator_email': labels.get('workflows.argoproj.io/creator-email'), - 'creator_username': labels.get('workflows.argoproj.io/creator-preferred-username'), - 'modifier_email': labels.get('workflows.argoproj.io/modifier-email'), - 'modifier_username': labels.get('workflows.argoproj.io/modifier-preferred-username'), - 'creation_timestamp': metadata.get('creationTimestamp'), - 'package_author': annotations.get('package.argoproj.io/author'), - 'package_description': annotations.get('package.argoproj.io/description'), - 'package_repository': annotations.get('package.argoproj.io/repository') - } + "template_name": metadata.get("name"), + "package_name": annotations.get("package.argoproj.io/name"), + "package_version": labels.get("package.argoproj.io/version"), + "source_system": labels.get("orchestration.atlan.com/source"), + "source_category": labels.get("orchestration.atlan.com/sourceCategory"), + "workflow_type": labels.get("orchestration.atlan.com/type"), + "certified": labels.get("orchestration.atlan.com/certified") == "true", + "verified": labels.get("orchestration.atlan.com/verified") == "true", + "creator_email": labels.get("workflows.argoproj.io/creator-email"), + "creator_username": labels.get( + "workflows.argoproj.io/creator-preferred-username" + ), + "modifier_email": labels.get("workflows.argoproj.io/modifier-email"), + "modifier_username": labels.get( + "workflows.argoproj.io/modifier-preferred-username" + ), + "creation_timestamp": metadata.get("creationTimestamp"), + "package_author": annotations.get("package.argoproj.io/author"), + "package_description": annotations.get("package.argoproj.io/description"), + "package_repository": annotations.get("package.argoproj.io/repository"), + } if not include_dag: return base_output else: return { **base_output, - 'workflow_steps': annotations.get('orchestration.atlan.com/workflow-steps'), - 'workflow_spec': source.get('spec') + "workflow_steps": annotations.get("orchestration.atlan.com/workflow-steps"), + "workflow_spec": source.get("spec"), } + def extract_workflow_metadata(dict_repr: Dict[str, Any]) -> Dict[str, Any]: """ Extract comprehensive metadata from a workflow_run (Workflow executed instance/run). @@ -141,7 +143,7 @@ def extract_workflow_metadata(dict_repr: Dict[str, Any]) -> Dict[str, Any]: - run_started_at, run_finished_at: Execution timestamps - run_estimated_duration, run_progress: Execution progress - run_cpu_usage, run_memory_usage: Resource usage - + Workflow Metadata (workflow-level): - workflow_id: Reference to the workflow (template) used - workflow_package_name: Package identifier @@ -150,45 +152,51 @@ def extract_workflow_metadata(dict_repr: Dict[str, Any]) -> Dict[str, Any]: - workflow_creation_timestamp, workflow_archiving_status: Lifecycle info """ source = dict_repr.get("_source") or {} - metadata = source.get('metadata') or {} - labels = metadata.get('labels') or {} - annotations = metadata.get('annotations') or {} - status = source.get('status') or {} + metadata = source.get("metadata") or {} + labels = metadata.get("labels") or {} + annotations = metadata.get("annotations") or {} + status = source.get("status") or {} return { # Run Metadata - 'run_id': dict_repr.get('_id') or metadata.get('name'), - 'run_phase': status.get('phase'), - 'run_started_at': status.get('startedat') or status.get('startedAt'), - 'run_finished_at': status.get('finishedat') or status.get('finishedAt'), - 'run_estimated_duration': status.get('estimatedDuration'), - 'run_progress': status.get('progress'), - 'run_cpu_usage': status.get('resourcesDuration', {}).get('cpu'), - 'run_memory_usage': status.get('resourcesDuration', {}).get('memory'), - + "run_id": dict_repr.get("_id") or metadata.get("name"), + "run_phase": status.get("phase"), + "run_started_at": status.get("startedat") or status.get("startedAt"), + "run_finished_at": status.get("finishedat") or status.get("finishedAt"), + "run_estimated_duration": status.get("estimatedDuration"), + "run_progress": status.get("progress"), + "run_cpu_usage": status.get("resourcesDuration", {}).get("cpu"), + "run_memory_usage": status.get("resourcesDuration", {}).get("memory"), # Workflow Metadata - 'workflow_id': labels.get('workflows.argoproj.io/workflow-template'), + "workflow_id": labels.get("workflows.argoproj.io/workflow-template"), # 'cron_workflow': labels.get('workflows.argoproj.io/cron-workflow'), - 'workflow_package_name': annotations.get('package.argoproj.io/name'), - 'workflow_cron_schedule': annotations.get('orchestration.atlan.com/schedule'), - 'workflow_cron_timezone': annotations.get('orchestration.atlan.com/timezone'), - 'workflow_creator_id': labels.get('workflows.argoproj.io/creator'), - 'workflow_creator_email': labels.get('workflows.argoproj.io/creator-email'), - 'workflow_creator_username': labels.get('workflows.argoproj.io/creator-preferred-username'), - 'workflow_modifier_id': labels.get('workflows.argoproj.io/modifier'), - 'workflow_modifier_email': labels.get('workflows.argoproj.io/modifier-email'), - 'workflow_modifier_username': labels.get('workflows.argoproj.io/modifier-preferred-username'), - 'workflow_creation_timestamp': metadata.get('creationTimestamp'), - 'workflow_archiving_status': labels.get('workflows.argoproj.io/workflow-archiving-status') + "workflow_package_name": annotations.get("package.argoproj.io/name"), + "workflow_cron_schedule": annotations.get("orchestration.atlan.com/schedule"), + "workflow_cron_timezone": annotations.get("orchestration.atlan.com/timezone"), + "workflow_creator_id": labels.get("workflows.argoproj.io/creator"), + "workflow_creator_email": labels.get("workflows.argoproj.io/creator-email"), + "workflow_creator_username": labels.get( + "workflows.argoproj.io/creator-preferred-username" + ), + "workflow_modifier_id": labels.get("workflows.argoproj.io/modifier"), + "workflow_modifier_email": labels.get("workflows.argoproj.io/modifier-email"), + "workflow_modifier_username": labels.get( + "workflows.argoproj.io/modifier-preferred-username" + ), + "workflow_creation_timestamp": metadata.get("creationTimestamp"), + "workflow_archiving_status": labels.get( + "workflows.argoproj.io/workflow-archiving-status" + ), } def get_workflow_package_names() -> List[str]: """ - Get the values of all workflow packages. + Get the values of all workflow packages. """ return [pkg.value for pkg in WorkflowPackage] + def get_workflows( id: Optional[str] = None, workflow_package_name: Optional[Union[WorkflowPackage, str]] = None, @@ -204,8 +212,8 @@ def get_workflows( Args: id (str, optional): The unique identifier (ID) of the workflow (e.g., 'atlan-snowflake-miner-1714638976'). If provided, returns a single workflow with full details (workflow_steps and workflow_spec). - workflow_package_name (Union[WorkflowPackage, str], optional): Workflow package type - (e.g., WorkflowPackage.SNOWFLAKE or "atlan-snowflake"). If provided, returns list of workflows + workflow_package_name (Union[WorkflowPackage, str], optional): Workflow package type + (e.g., WorkflowPackage.SNOWFLAKE or "atlan-snowflake"). If provided, returns list of workflows with metadata only (no workflow_steps or workflow_spec). max_results (int, optional): Maximum number of workflows to return when getting by type. Defaults to 10. Only used when workflow_package_name is provided. @@ -240,7 +248,7 @@ def get_workflows( if id is not None: # Get single workflow by ID - always include full details logger.info(f"Retrieving workflow with ID: {id}") - + if not id: raise ValueError("id cannot be empty") @@ -267,19 +275,27 @@ def get_workflows( elif workflow_package_name is not None: # Get workflows by type - always metadata only - logger.info(f"Retrieving workflows with workflow_package_name={workflow_package_name}, max_results={max_results}") + logger.info( + f"Retrieving workflows with workflow_package_name={workflow_package_name}, max_results={max_results}" + ) - logger.debug(f"Calling client.workflow.find_by_type() with workflow_package_name={workflow_package_name}") - results = client.workflow.find_by_type(prefix=workflow_package_name, max_results=max_results) + logger.debug( + f"Calling client.workflow.find_by_type() with workflow_package_name={workflow_package_name}" + ) + results = client.workflow.find_by_type( + prefix=workflow_package_name, max_results=max_results + ) - logger.info(f"Successfully retrieved workflows") + logger.info("Successfully retrieved workflows") # Process the response workflows = results if results else [] total = len(workflows) if workflows else 0 # Always metadata only (include_dag=False) when getting by type - processed_workflows = [_result_to_dict(workflow, include_dag=False) for workflow in workflows] + processed_workflows = [ + _result_to_dict(workflow, include_dag=False) for workflow in workflows + ] return { "workflows": processed_workflows, @@ -314,12 +330,12 @@ def get_workflow_runs( 2. Filter across all workflows by status list and optional time ranges Args: - workflow_name (str, optional): Name of the workflow (template) as displayed in the UI + workflow_name (str, optional): Name of the workflow (template) as displayed in the UI (e.g., 'atlan-snowflake-miner-1714638976'). If provided, filters runs for that specific workflow. - workflow_phase (Union[AtlanWorkflowPhase, str], optional): Phase of the workflow_run + workflow_phase (Union[AtlanWorkflowPhase, str], optional): Phase of the workflow_run (e.g., Succeeded, Running, Failed). Required if workflow_name is provided and status is not. status (Union[List[AtlanWorkflowPhase], List[str]], optional): List of workflow_run phases to filter by - (e.g., ['Succeeded', 'Failed']). Required if workflow_name is not provided. Can also be used + (e.g., ['Succeeded', 'Failed']). Required if workflow_name is not provided. Can also be used with workflow_name (will use first item from list as workflow_phase). started_at (str, optional): Lower bound on 'status.startedAt' timestamp. Accepts: - Relative time format: 'now-2h', 'now-24h', 'now-7d', 'now-30d' @@ -371,15 +387,21 @@ def get_workflow_runs( if workflow_name: # Use get_runs() API - single workflow, single phase if not workflow_phase and not status: - raise ValueError("Either workflow_phase or status must be provided when workflow_name is specified") - + raise ValueError( + "Either workflow_phase or status must be provided when workflow_name is specified" + ) + # Use workflow_phase if provided, otherwise use first item from status list phase_to_use = workflow_phase if not phase_to_use and status: - phase_to_use = status[0] if isinstance(status, list) and len(status) > 0 else None - + phase_to_use = ( + status[0] if isinstance(status, list) and len(status) > 0 else None + ) + if not phase_to_use: - raise ValueError("workflow_phase or status must be provided when workflow_name is specified") + raise ValueError( + "workflow_phase or status must be provided when workflow_name is specified" + ) # Convert string to AtlanWorkflowPhase enum if needed if isinstance(phase_to_use, str): @@ -395,7 +417,9 @@ def get_workflow_runs( raise ValueError(f"Invalid workflow phase: {phase_to_use}") # Retrieve workflow runs using the pyatlan SDK - logger.debug(f"Calling client.workflow.get_runs() with workflow_name={workflow_name}, workflow_phase={phase_to_use}") + logger.debug( + f"Calling client.workflow.get_runs() with workflow_name={workflow_name}, workflow_phase={phase_to_use}" + ) response = client.workflow.get_runs( workflow_name=workflow_name, workflow_phase=phase_to_use, @@ -407,7 +431,9 @@ def get_workflow_runs( else: # Use find_runs_by_status_and_time_range() API - all workflows, status list, time ranges if not status: - raise ValueError("status must be provided when workflow_name is not specified") + raise ValueError( + "status must be provided when workflow_name is not specified" + ) # Convert string statuses to AtlanWorkflowPhase enums if needed status_list = [] @@ -478,4 +504,3 @@ def get_workflow_runs( "total": 0, "error": error_msg, } - From 9b5ed83977ec446878c42e517f2234b89380b421 Mon Sep 17 00:00:00 2001 From: Christopher Tin Date: Wed, 3 Dec 2025 18:22:53 -0800 Subject: [PATCH 08/11] Fix ruff-format error on pre-commit check --- modelcontextprotocol/server.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/modelcontextprotocol/server.py b/modelcontextprotocol/server.py index c3521a20..2dae012a 100644 --- a/modelcontextprotocol/server.py +++ b/modelcontextprotocol/server.py @@ -1164,6 +1164,8 @@ def get_workflow_runs_tool( from_=from_, size=size, ) + + def create_domains(domains) -> List[Dict[str, Any]]: """ Create Data Domains or Sub Domains in Atlan. From f48b673166f7b7703db82e8870ad3023e585ec64 Mon Sep 17 00:00:00 2001 From: Christopher Tin Date: Wed, 3 Dec 2025 18:25:33 -0800 Subject: [PATCH 09/11] Add decorator for create_domains tool in server.py --- modelcontextprotocol/server.py | 1 + 1 file changed, 1 insertion(+) diff --git a/modelcontextprotocol/server.py b/modelcontextprotocol/server.py index 2dae012a..f0ca4c71 100644 --- a/modelcontextprotocol/server.py +++ b/modelcontextprotocol/server.py @@ -1166,6 +1166,7 @@ def get_workflow_runs_tool( ) +@mcp.tool() def create_domains(domains) -> List[Dict[str, Any]]: """ Create Data Domains or Sub Domains in Atlan. From 4d1633cd816213bea9199bffbc9accbf9e66033e Mon Sep 17 00:00:00 2001 From: Christopher Tin Date: Mon, 22 Dec 2025 15:01:30 -0800 Subject: [PATCH 10/11] Refactor naming convention notes in workflows.py and server.py to remove API terminology. Simplified descriptions to enhance understanding of 'workflow' and 'workflow_run' distinctions. --- modelcontextprotocol/server.py | 4 ++-- modelcontextprotocol/tools/workflows.py | 15 +++++---------- 2 files changed, 7 insertions(+), 12 deletions(-) diff --git a/modelcontextprotocol/server.py b/modelcontextprotocol/server.py index f0ca4c71..aaa4e38c 100644 --- a/modelcontextprotocol/server.py +++ b/modelcontextprotocol/server.py @@ -938,7 +938,7 @@ def get_workflows_tool( """ Retrieve workflows (WorkflowTemplate) by ID or by package type. - Naming Convention Note: The Argo/Kubernetes API uses terminology that can be confusing: + Naming Convention Note: - "WorkflowTemplate" (kind="WorkflowTemplate") = workflow (template definition) - "Workflow" (kind="Workflow") = workflow_run (executed instance/run) @@ -1023,7 +1023,7 @@ def get_workflow_runs_tool( """ Retrieve workflow_runs with flexible filtering options. - Naming Convention Note: The Argo/Kubernetes API uses terminology that can be confusing: + Naming Convention Note: - "WorkflowTemplate" (kind="WorkflowTemplate") = workflow (template definition) - "Workflow" (kind="Workflow") = workflow_run (executed instance/run) diff --git a/modelcontextprotocol/tools/workflows.py b/modelcontextprotocol/tools/workflows.py index 89b6c762..568743d4 100644 --- a/modelcontextprotocol/tools/workflows.py +++ b/modelcontextprotocol/tools/workflows.py @@ -1,13 +1,8 @@ """Tools for retrieving and managing workflows in Atlan. Naming Convention Note: -The Argo/Kubernetes API uses terminology that can be confusing: -- "WorkflowTemplate" (kind="WorkflowTemplate") = workflow (template definition) -- "Workflow" (kind="Workflow") = workflow_run (executed instance/run) - -Throughout this module: -- "workflow" refers to WorkflowTemplate (kind="WorkflowTemplate") - the template definition -- "workflow_run" refers to Workflow (kind="Workflow") - an executed instance/run +- "workflow" refers to a workflow template definition (kind="WorkflowTemplate") +- "workflow_run" refers to an executed instance/run (kind="Workflow") - The extract_workflow_metadata() function returns fields prefixed with "run_*" for execution data and "workflow_*" for workflow-level metadata to clearly distinguish between the two. """ @@ -25,7 +20,7 @@ def _result_to_dict(result: Any, include_dag: bool = False) -> Optional[Dict[str """ Process a workflow or workflow_run object into a standardized dictionary format. - Note: There is a naming confusion in the Argo/Kubernetes API: + Note: This function handles two types of workflow objects: - "WorkflowTemplate" (kind="WorkflowTemplate") = workflow (template definition) - "Workflow" (kind="Workflow") = workflow_run (executed instance/run) @@ -67,7 +62,7 @@ def extract_workflow_template_metadata( """ Extract useful metadata from a workflow (WorkflowTemplate). - Note: In Argo terminology, "WorkflowTemplate" (kind="WorkflowTemplate") is a workflow (template definition). + Note: "WorkflowTemplate" (kind="WorkflowTemplate") is a workflow (template definition). This function extracts workflow metadata including package info, source system, certification status, etc. Args: @@ -125,7 +120,7 @@ def extract_workflow_metadata(dict_repr: Dict[str, Any]) -> Dict[str, Any]: """ Extract comprehensive metadata from a workflow_run (Workflow executed instance/run). - Note: In Argo terminology, "Workflow" (kind="Workflow") is a workflow_run (executed instance/run). + Note: "Workflow" (kind="Workflow") is a workflow_run (executed instance/run). This is different from "WorkflowTemplate" which is a workflow (template definition). The returned dictionary uses a two-tier naming convention: From 0095946ec0d9a7b30603faeeaf690b68138b692f Mon Sep 17 00:00:00 2001 From: Christopher Tin Date: Mon, 22 Dec 2025 15:06:58 -0800 Subject: [PATCH 11/11] Refactor documentation in server.py and workflows.py to clarify workflow and workflow_run terminology. Removed API-specific language for improved readability and consistency. --- modelcontextprotocol/server.py | 14 +++---------- modelcontextprotocol/tools/workflows.py | 28 +++++++++---------------- 2 files changed, 13 insertions(+), 29 deletions(-) diff --git a/modelcontextprotocol/server.py b/modelcontextprotocol/server.py index aaa4e38c..e63ae155 100644 --- a/modelcontextprotocol/server.py +++ b/modelcontextprotocol/server.py @@ -936,11 +936,7 @@ def get_workflows_tool( max_results: int = 10, ) -> Dict[str, Any]: """ - Retrieve workflows (WorkflowTemplate) by ID or by package type. - - Naming Convention Note: - - "WorkflowTemplate" (kind="WorkflowTemplate") = workflow (template definition) - - "Workflow" (kind="Workflow") = workflow_run (executed instance/run) + Retrieve workflows by ID or by package type. This tool supports two main use cases: 1. Get a specific workflow by ID (returns full workflow details including steps and spec) @@ -949,7 +945,7 @@ def get_workflows_tool( IMPORTANT: When getting by ID, this tool returns full workflow instructions with all steps and transformations. When getting by type, this tool returns only basic metadata to avoid large responses. - Note: This tool only returns workflows (WorkflowTemplate), not workflow_runs. + Note: This tool only returns workflows (templates), not workflow_runs. To get workflow_run information, use get_workflow_runs_tool. When to use this tool: @@ -971,7 +967,7 @@ def get_workflows_tool( Returns: Dict[str, Any]: Dictionary containing: - - workflows: List of workflow (WorkflowTemplate) dictionaries. When getting by ID, contains single item. + - workflows: List of workflow dictionaries. When getting by ID, contains single item. Each workflow contains: - template_name: Name of the template - package_name, package_version: Package information @@ -1023,10 +1019,6 @@ def get_workflow_runs_tool( """ Retrieve workflow_runs with flexible filtering options. - Naming Convention Note: - - "WorkflowTemplate" (kind="WorkflowTemplate") = workflow (template definition) - - "Workflow" (kind="Workflow") = workflow_run (executed instance/run) - This tool supports two main use cases: 1. Filter by specific workflow name and phase (single workflow) 2. Filter across all workflows by status list and optional time ranges diff --git a/modelcontextprotocol/tools/workflows.py b/modelcontextprotocol/tools/workflows.py index 568743d4..eef2c5b8 100644 --- a/modelcontextprotocol/tools/workflows.py +++ b/modelcontextprotocol/tools/workflows.py @@ -1,8 +1,8 @@ """Tools for retrieving and managing workflows in Atlan. Naming Convention Note: -- "workflow" refers to a workflow template definition (kind="WorkflowTemplate") -- "workflow_run" refers to an executed instance/run (kind="Workflow") +- "workflow" refers to a workflow template definition +- "workflow_run" refers to an executed instance/run - The extract_workflow_metadata() function returns fields prefixed with "run_*" for execution data and "workflow_*" for workflow-level metadata to clearly distinguish between the two. """ @@ -20,15 +20,11 @@ def _result_to_dict(result: Any, include_dag: bool = False) -> Optional[Dict[str """ Process a workflow or workflow_run object into a standardized dictionary format. - Note: This function handles two types of workflow objects: - - "WorkflowTemplate" (kind="WorkflowTemplate") = workflow (template definition) - - "Workflow" (kind="Workflow") = workflow_run (executed instance/run) - - This function handles both types and returns standardized metadata dictionaries. + This function handles both workflow templates and workflow runs, returning standardized metadata dictionaries. Args: result: The workflow or workflow_run object from PyAtlan (WorkflowSearchResult or Workflow). - include_dag: If True, includes the workflow DAG in the output. This only applies if the input is a workflow (WorkflowTemplate). + include_dag: If True, includes the workflow DAG in the output. This only applies if the input is a workflow template. For workflow_run instances, this parameter has no effect. Returns: Optional[Dict[str, Any]]: Serialized workflow or workflow_run dictionary using Pydantic's dict() method. @@ -60,13 +56,12 @@ def extract_workflow_template_metadata( dict_repr: Dict[str, Any], include_dag: bool = False ) -> Dict[str, Any]: """ - Extract useful metadata from a workflow (WorkflowTemplate). + Extract useful metadata from a workflow template. - Note: "WorkflowTemplate" (kind="WorkflowTemplate") is a workflow (template definition). This function extracts workflow metadata including package info, source system, certification status, etc. Args: - dict_repr: The workflow object from the workflows array (kind="WorkflowTemplate") + dict_repr: The workflow template object from the workflows array include_dag: If True, includes the workflow DAG (workflow_steps and workflow_spec) in the output. When False, returns only essential metadata fields. @@ -118,17 +113,14 @@ def extract_workflow_template_metadata( def extract_workflow_metadata(dict_repr: Dict[str, Any]) -> Dict[str, Any]: """ - Extract comprehensive metadata from a workflow_run (Workflow executed instance/run). - - Note: "Workflow" (kind="Workflow") is a workflow_run (executed instance/run). - This is different from "WorkflowTemplate" which is a workflow (template definition). + Extract comprehensive metadata from a workflow_run (executed instance/run). The returned dictionary uses a two-tier naming convention: - Fields prefixed with "run_*" contain execution-specific metadata (phase, timing, resource usage) - Fields prefixed with "workflow_*" contain workflow-level metadata (template reference, package, creator) Args: - dict_repr: The workflow_run object dictionary representation (kind="Workflow") + dict_repr: The workflow_run object dictionary representation Returns: Dictionary containing comprehensive workflow_run metadata with fields: @@ -198,7 +190,7 @@ def get_workflows( max_results: int = 10, ) -> Dict[str, Any]: """ - Retrieve workflows (WorkflowTemplate) by ID or by package type. + Retrieve workflows by ID or by package type. This function supports two main use cases: 1. Get a specific workflow by ID (returns full workflow details including steps and spec) @@ -215,7 +207,7 @@ def get_workflows( Returns: Dict[str, Any]: Dictionary containing: - - workflows: List of workflow (WorkflowTemplate) dictionaries. When getting by ID, contains single item. + - workflows: List of workflow dictionaries. When getting by ID, contains single item. - total: Total count of workflows (1 when getting by ID, actual count when getting by type) - error: None if no error occurred, otherwise the error message