diff --git a/README.md b/README.md index 4318d1c..f4fe872 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ RushTI transforms sequential TurboIntegrator execution into intelligent, paralle - **Self-Optimization** — EWMA-based learning reorders tasks from historical performance - **Checkpoint & Resume** — Automatic progress saving with failure recovery - **Exclusive Mode** — Prevents concurrent runs on shared TM1 servers -- **SQLite Statistics** — Persistent execution history with dashboards and analysis +- **Statistics Storage (SQLite or DynamoDB)** — Persistent execution history with dashboards and analysis - **TM1 Integration** — Read tasks from and write results to a TM1 cube - **100% Backwards Compatible** — Legacy TXT task files work without changes diff --git a/config/settings.ini.template b/config/settings.ini.template index da86b4d..284e0cd 100644 --- a/config/settings.ini.template +++ b/config/settings.ini.template @@ -196,23 +196,43 @@ # auto_resume = false # ------------------------------------------------------------------------------ -# [stats] - SQLite stats database for execution history +# [stats] - Execution history storage (SQLite or DynamoDB) # ------------------------------------------------------------------------------ [stats] # Enable the stats database for storing execution history -# The stats database stores execution statistics for: +# Stats storage stores execution statistics for: # - Optimization features (EWMA runtime estimation) # - TM1 cube logging data source # - Historical analysis via 'rushti db' commands # Default: false # enabled = false +# Storage backend: sqlite or dynamodb +# Default: sqlite +# backend = sqlite + # Path to the SQLite database file # Relative paths are resolved from the application directory # Default: data/rushti_stats.db # db_path = data/rushti_stats.db +# AWS region for DynamoDB backend (required when backend = dynamodb) +# Example: eu-west-1 +# dynamodb_region = eu-west-1 + +# DynamoDB runs table name (backend = dynamodb) +# Default: rushti_runs +# dynamodb_runs_table = rushti_runs + +# DynamoDB task results table name (backend = dynamodb) +# Default: rushti_task_results +# dynamodb_task_results_table = rushti_task_results + +# Optional custom DynamoDB endpoint URL (for LocalStack/testing) +# Example: http://localhost:4566 +# dynamodb_endpoint_url = + # Number of days to retain execution history # Valid range: 1-365 # Default: 90 diff --git a/docs/advanced/settings-reference.md b/docs/advanced/settings-reference.md index bd35806..46dc01f 100644 --- a/docs/advanced/settings-reference.md +++ b/docs/advanced/settings-reference.md @@ -184,12 +184,22 @@ Controls checkpoint saving and resume capability. When enabled, RushTI periodica ### [stats] -Controls the SQLite statistics database that stores execution history. The stats database powers several features: EWMA optimization, the `rushti stats` commands, dashboard visualization, and historical analysis. +Controls stats storage for execution history. RushTI supports two backends: +- `sqlite` (default): local file storage +- `dynamodb`: AWS DynamoDB tables + +Stats storage powers several features: EWMA optimization, the `rushti stats` commands, dashboard visualization, and historical analysis. | Setting | Type | Default | Description | |---------|------|---------|-------------| -| `enabled` | bool | `false` | Enable the SQLite stats database. When enabled, every run records task-level execution data (timing, status, errors). | -| `retention_days` | int | `90` | Days to keep execution history. Records older than this value are deleted at startup. Valid range: 1--365. Use `0` to keep data indefinitely. | +| `enabled` | bool | `false` | Enable stats storage. When enabled, every run records task-level execution data (timing, status, errors). | +| `backend` | str | `sqlite` | Storage backend: `sqlite` or `dynamodb`. | +| `db_path` | str | `data/rushti_stats.db` | SQLite file path (used when `backend = sqlite`). | +| `dynamodb_region` | str | `` | AWS region for DynamoDB (used when `backend = dynamodb`). | +| `dynamodb_runs_table` | str | `rushti_runs` | DynamoDB table name for run-level records. | +| `dynamodb_task_results_table` | str | `rushti_task_results` | DynamoDB table name for task-level records. | +| `dynamodb_endpoint_url` | str | `` | Optional custom endpoint URL (for local testing tools such as LocalStack). | +| `retention_days` | int | `90` | Days to keep execution history. Records older than this value are deleted at startup. Valid range: 1-365. Use `0` to keep data indefinitely. | **Required by:** `[optimization]`, `rushti stats` commands, `rushti stats visualize` @@ -353,7 +363,7 @@ Copy this template to `config/settings.ini` and uncomment the settings you want # auto_resume = false # ------------------------------------------------------------------------------ -# [stats] - SQLite stats database for execution history +# [stats] - Execution history storage (SQLite or DynamoDB) # ------------------------------------------------------------------------------ [stats] @@ -365,11 +375,31 @@ Copy this template to `config/settings.ini` and uncomment the settings you want # Default: false # enabled = false +# Storage backend: sqlite or dynamodb +# Default: sqlite +# backend = sqlite + # Path to the SQLite database file # Relative paths are resolved from the application directory # Default: data/rushti_stats.db # db_path = data/rushti_stats.db +# AWS region for DynamoDB backend (required when backend = dynamodb) +# Example: eu-west-1 +# dynamodb_region = eu-west-1 + +# DynamoDB runs table name (backend = dynamodb) +# Default: rushti_runs +# dynamodb_runs_table = rushti_runs + +# DynamoDB task results table name (backend = dynamodb) +# Default: rushti_task_results +# dynamodb_task_results_table = rushti_task_results + +# Optional custom DynamoDB endpoint URL (for LocalStack/testing) +# Example: http://localhost:4566 +# dynamodb_endpoint_url = + # Number of days to retain execution history # Valid range: 1-365 # Default: 90 diff --git a/pyproject.toml b/pyproject.toml index 3f1984d..72026f6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,9 @@ dependencies = [ ] [project.optional-dependencies] +dynamodb = [ + "boto3>=1.34.0", +] dev = [ "pytest>=7.0.0", "pytest-asyncio", diff --git a/src/rushti/cli.py b/src/rushti/cli.py index 5036ca7..6b0d568 100644 --- a/src/rushti/cli.py +++ b/src/rushti/cli.py @@ -960,12 +960,17 @@ def main() -> int: results = list() # Initialize stats database if enabled - from rushti.stats import get_db_path + from rushti.stats import get_db_path, get_stats_backend db_kwargs = dict( enabled=settings.stats.enabled, retention_days=settings.stats.retention_days, + backend=get_stats_backend(settings), db_path=get_db_path(settings), + dynamodb_region=settings.stats.dynamodb_region or None, + dynamodb_runs_table=settings.stats.dynamodb_runs_table, + dynamodb_task_results_table=settings.stats.dynamodb_task_results_table, + dynamodb_endpoint_url=settings.stats.dynamodb_endpoint_url or None, ) ctx = ExecutionContext( stats_db=create_stats_database(**db_kwargs), diff --git a/src/rushti/commands.py b/src/rushti/commands.py index e497d50..e28a615 100644 --- a/src/rushti/commands.py +++ b/src/rushti/commands.py @@ -1149,11 +1149,18 @@ def _stats_export(args) -> None: # Import here to avoid circular imports from rushti.tm1_integration import export_results_to_csv + from rushti.stats import get_db_path # Create stats database connection stats_db = create_stats_database( enabled=True, retention_days=settings.stats.retention_days, + backend=settings.stats.backend, + db_path=get_db_path(settings), + dynamodb_region=settings.stats.dynamodb_region or None, + dynamodb_runs_table=settings.stats.dynamodb_runs_table, + dynamodb_task_results_table=settings.stats.dynamodb_task_results_table, + dynamodb_endpoint_url=settings.stats.dynamodb_endpoint_url or None, ) try: @@ -1195,12 +1202,21 @@ def _stats_analyze(args) -> None: try: # Load settings from rushti.settings import load_settings - from rushti.stats import StatsDatabase, get_db_path + from rushti.stats import create_stats_database, get_db_path settings = load_settings(args.settings_file) # Initialize stats database - stats_db = StatsDatabase(db_path=get_db_path(settings), enabled=settings.stats.enabled) + stats_db = create_stats_database( + enabled=settings.stats.enabled, + retention_days=settings.stats.retention_days, + backend=settings.stats.backend, + db_path=get_db_path(settings), + dynamodb_region=settings.stats.dynamodb_region or None, + dynamodb_runs_table=settings.stats.dynamodb_runs_table, + dynamodb_task_results_table=settings.stats.dynamodb_task_results_table, + dynamodb_endpoint_url=settings.stats.dynamodb_endpoint_url or None, + ) # Run analysis report = analyze_runs( @@ -1276,7 +1292,7 @@ def _stats_optimize(args) -> None: write_optimized_taskfile, ) from rushti.settings import load_settings - from rushti.stats import StatsDatabase, get_db_path + from rushti.stats import create_stats_database, get_db_path from rushti.utils import resolve_app_path settings = load_settings(args.settings_file) @@ -1287,7 +1303,16 @@ def _stats_optimize(args) -> None: sys.exit(1) # Initialize stats database - stats_db = StatsDatabase(db_path=get_db_path(settings), enabled=True) + stats_db = create_stats_database( + enabled=True, + retention_days=settings.stats.retention_days, + backend=settings.stats.backend, + db_path=get_db_path(settings), + dynamodb_region=settings.stats.dynamodb_region or None, + dynamodb_runs_table=settings.stats.dynamodb_runs_table, + dynamodb_task_results_table=settings.stats.dynamodb_task_results_table, + dynamodb_endpoint_url=settings.stats.dynamodb_endpoint_url or None, + ) try: # Resolve taskfile: explicit --tasks flag, or auto-resolve from archive @@ -1592,16 +1617,31 @@ def _stats_visualize(args) -> None: from rushti.dashboard import generate_dashboard from rushti.db_admin import get_visualization_data from rushti.settings import load_settings - from rushti.stats import get_db_path + from rushti.stats import create_stats_database, get_db_path, get_stats_backend from rushti.utils import resolve_app_path settings = load_settings(getattr(args, "settings_file", None)) + backend = get_stats_backend(settings) db_path = get_db_path(settings) + stats_db = None try: - print(f"Generating visualizations for workflow: {args.workflow}") + stats_db = create_stats_database( + enabled=True, + backend=backend, + db_path=db_path, + dynamodb_region=settings.stats.dynamodb_region or None, + dynamodb_runs_table=settings.stats.dynamodb_runs_table, + dynamodb_task_results_table=settings.stats.dynamodb_task_results_table, + dynamodb_endpoint_url=settings.stats.dynamodb_endpoint_url or None, + ) + data = get_visualization_data( + args.workflow, + stats_db, + include_all_workflows=True, + ) - data = get_visualization_data(args.workflow, db_path) + print(f"Generating visualizations for workflow: {args.workflow}") if not data.get("exists"): print(f"Error: {data.get('message')}") sys.exit(1) @@ -1624,37 +1664,64 @@ def _stats_visualize(args) -> None: # --- Attempt DAG generation --- dag_generated = False - taskfile_path = None - # Find the first accessible taskfile_path from runs (most recent first) + # Prefer DB-based DAG (no taskfile on disk needed); fall back to taskfile if unavailable. runs = data["runs"] - for run in runs: - candidate = run.get("taskfile_path") - if candidate and not candidate.startswith("TM1:") and os.path.isfile(candidate): - taskfile_path = candidate - break + workflow_lower = args.workflow.lower() + workflow_runs = [r for r in runs if (r.get("workflow") or "").lower() == workflow_lower] + latest_run = workflow_runs[0] if workflow_runs else None - if taskfile_path: + dag_generated_from_db = False + if latest_run: try: - from rushti.taskfile_ops import visualize_dag - - # Ensure output directory exists - Path(dag_path).parent.mkdir(parents=True, exist_ok=True) - - visualize_dag( - source=taskfile_path, - output_path=dag_path, - dashboard_url=dashboard_filename, - ) - dag_generated = True - print(f"DAG visualization generated: {dag_path}") + from rushti.taskfile_ops import visualize_dag_from_db_results + + latest_run_id = latest_run["run_id"] + latest_task_results = [ + tr for tr in data["task_results"] if tr["run_id"] == latest_run_id + ] + + if latest_task_results: + Path(dag_path).parent.mkdir(parents=True, exist_ok=True) + visualize_dag_from_db_results( + task_results=latest_task_results, + output_path=dag_path, + dashboard_url=dashboard_filename, + ) + dag_generated = True + dag_generated_from_db = True + print(f"DAG visualization generated: {dag_path}") except Exception as e: - logger.warning(f"Could not generate DAG visualization: {e}") - print(f"Warning: DAG visualization skipped ({e})") - else: - print( - "Warning: No accessible taskfile found in run history, skipping DAG visualization" - ) + logger.warning(f"Could not generate DAG from DB: {e}") + + if not dag_generated_from_db: + # Fall back to taskfile on disk (most recent accessible one for this workflow) + taskfile_path = None + for run in workflow_runs: + candidate = run.get("taskfile_path") + if candidate and not candidate.startswith("TM1:") and os.path.isfile(candidate): + taskfile_path = candidate + break + + if taskfile_path: + try: + from rushti.taskfile_ops import visualize_dag + + Path(dag_path).parent.mkdir(parents=True, exist_ok=True) + visualize_dag( + source=taskfile_path, + output_path=dag_path, + dashboard_url=dashboard_filename, + ) + dag_generated = True + print(f"DAG visualization generated from taskfile: {dag_path}") + except Exception as e: + logger.warning(f"Could not generate DAG visualization: {e}") + print(f"Warning: DAG visualization skipped ({e})") + else: + print( + "Warning: No DB task results or accessible taskfile found, skipping DAG visualization" + ) # --- Generate dashboard --- output_file = generate_dashboard( @@ -1682,6 +1749,9 @@ def _stats_visualize(args) -> None: traceback.print_exc() sys.exit(1) + finally: + if stats_db is not None: + stats_db.close() def _stats_list(args) -> None: @@ -1693,14 +1763,26 @@ def _stats_list(args) -> None: """ from rushti.db_admin import list_runs, list_tasks from rushti.settings import load_settings - from rushti.stats import get_db_path + from rushti.stats import create_stats_database, get_db_path, get_stats_backend settings = load_settings(getattr(args, "settings_file", None)) + backend = get_stats_backend(settings) db_path = get_db_path(settings) + stats_db = None try: + stats_db = create_stats_database( + enabled=True, + backend=backend, + db_path=db_path, + dynamodb_region=settings.stats.dynamodb_region or None, + dynamodb_runs_table=settings.stats.dynamodb_runs_table, + dynamodb_task_results_table=settings.stats.dynamodb_task_results_table, + dynamodb_endpoint_url=settings.stats.dynamodb_endpoint_url or None, + ) + if args.list_type == "runs": - runs = list_runs(args.workflow, db_path, limit=args.limit) + runs = list_runs(args.workflow, stats_db, limit=args.limit) if not runs: print(f"No runs found for workflow: {args.workflow}") sys.exit(0) @@ -1719,7 +1801,7 @@ def _stats_list(args) -> None: ) elif args.list_type == "tasks": - tasks = list_tasks(args.workflow, db_path) + tasks = list_tasks(args.workflow, stats_db) if not tasks: print(f"No tasks found for workflow: {args.workflow}") sys.exit(0) @@ -1747,6 +1829,9 @@ def _stats_list(args) -> None: except Exception as e: print(f"Error: {e}") sys.exit(1) + finally: + if stats_db is not None: + stats_db.close() def run_db_command(argv: list) -> None: @@ -1778,7 +1863,7 @@ def run_db_command(argv: list) -> None: show_task_history, ) from rushti.settings import load_settings - from rushti.stats import get_db_path + from rushti.stats import get_db_path, get_stats_backend # Check for help flag or no subcommand if len(argv) < 3 or (len(argv) == 3 and argv[2] in ("--help", "-h")): @@ -1848,6 +1933,9 @@ def run_db_command(argv: list) -> None: # Get database path from settings settings = load_settings(args.settings_file) + if get_stats_backend(settings) != "sqlite": + print("Error: 'db' command currently supports only [stats] backend = sqlite") + sys.exit(1) db_path = get_db_path(settings) try: diff --git a/src/rushti/contention_analyzer.py b/src/rushti/contention_analyzer.py index 06f6a99..4ae0d6a 100644 --- a/src/rushti/contention_analyzer.py +++ b/src/rushti/contention_analyzer.py @@ -19,10 +19,15 @@ import math from dataclasses import dataclass, field from pathlib import Path -from typing import Any, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union from rushti.stats import StatsDatabase +if TYPE_CHECKING: + from rushti.stats import DynamoDBStatsDatabase + +AnyStatsDatabase = Union[StatsDatabase, "DynamoDBStatsDatabase"] + logger = logging.getLogger(__name__) @@ -81,7 +86,7 @@ def light_task_count(self) -> int: def _compute_ewma_durations( - stats_db: StatsDatabase, + stats_db: AnyStatsDatabase, workflow: str, lookback_runs: int = 10, alpha: float = 0.3, @@ -119,7 +124,7 @@ def _compute_ewma_durations( def _get_task_parameters( - stats_db: StatsDatabase, + stats_db: AnyStatsDatabase, workflow: str, ) -> List[Dict[str, Any]]: """Get task_id, task_signature, process, and parameters for the most recent run. @@ -128,29 +133,33 @@ def _get_task_parameters( :param workflow: Workflow name :return: List of dicts with task_id, task_signature, process, parameters (parsed) """ - cursor = stats_db._conn.cursor() - cursor.execute( - """ - SELECT task_id, task_signature, process, parameters - FROM task_results - WHERE run_id = ( - SELECT run_id FROM runs - WHERE workflow = ? AND status = 'Success' - ORDER BY start_time DESC LIMIT 1 - ) - ORDER BY CAST(task_id AS INTEGER) - """, - (workflow,), - ) + runs = stats_db.get_runs_for_workflow(workflow) + successful_run = next((r for r in runs if r.get("status") == "Success"), None) + if not successful_run: + return [] + + run_id = successful_run.get("run_id") + if not run_id: + return [] + + run_results = stats_db.get_run_results(run_id) + + def _task_sort_key(result: Dict[str, Any]) -> Any: + task_id = str(result.get("task_id", "")) + if task_id.isdigit(): + return int(task_id) + return task_id + + run_results.sort(key=_task_sort_key) results = [] - for row in cursor.fetchall(): - params = json.loads(row["parameters"]) if row["parameters"] else {} + for row in run_results: + params = json.loads(row["parameters"]) if row.get("parameters") else {} results.append( { - "task_id": row["task_id"], - "task_signature": row["task_signature"], - "process": row["process"], + "task_id": row.get("task_id"), + "task_signature": row.get("task_signature"), + "process": row.get("process"), "parameters": params, } ) @@ -429,7 +438,7 @@ def _round_to_5(value: float) -> int: def _detect_concurrency_ceiling( - stats_db: StatsDatabase, + stats_db: AnyStatsDatabase, workflow: str, min_correlation: float = 0.7, max_efficiency_ratio: float = 0.75, @@ -655,7 +664,7 @@ def _detect_concurrency_ceiling( def analyze_contention( - stats_db: StatsDatabase, + stats_db: AnyStatsDatabase, workflow: str, task_params: Optional[List[Dict[str, Any]]] = None, sensitivity: float = 10.0, @@ -870,7 +879,7 @@ def analyze_contention( def get_archived_taskfile_path( - stats_db: StatsDatabase, + stats_db: AnyStatsDatabase, workflow: str, ) -> Optional[str]: """Get the archived taskfile path from the most recent successful run. @@ -883,18 +892,14 @@ def get_archived_taskfile_path( :param workflow: Workflow name :return: Path to archived JSON taskfile, or None if no successful runs exist """ - cursor = stats_db._conn.cursor() - cursor.execute( - """ - SELECT taskfile_path FROM runs - WHERE workflow = ? AND status = 'Success' - ORDER BY start_time DESC LIMIT 1 - """, - (workflow,), - ) - row = cursor.fetchone() - if row and row["taskfile_path"]: - return row["taskfile_path"] + runs = stats_db.get_runs_for_workflow(workflow) + successful_run = next((r for r in runs if r.get("status") == "Success"), None) + if successful_run: + run_id = successful_run.get("run_id") + if run_id: + run_info = stats_db.get_run_info(run_id) + if run_info and run_info.get("taskfile_path"): + return run_info["taskfile_path"] return None diff --git a/src/rushti/dashboard.py b/src/rushti/dashboard.py index 521f5b2..e46a11b 100644 --- a/src/rushti/dashboard.py +++ b/src/rushti/dashboard.py @@ -195,6 +195,7 @@ def _prepare_dashboard_data( runs: List[Dict[str, Any]], task_results: List[Dict[str, Any]], default_runs: int, + selected_workflow: Optional[str] = None, ) -> Dict[str, Any]: """Prepare all data for the dashboard template. @@ -211,6 +212,10 @@ def _prepare_dashboard_data( tasks_by_run[run_id] = [] tasks_by_run[run_id].append(tr) + # Normalize workflow names to lowercase for case-insensitive aggregation + if selected_workflow is not None: + selected_workflow = selected_workflow.lower() + # Build enriched run data enriched_runs = [] for run in runs: @@ -221,6 +226,7 @@ def _prepare_dashboard_data( enriched_runs.append( { **run, + "workflow": (run.get("workflow") or "").lower(), "stats": run_stats, "concurrency": concurrency, "task_count_actual": len(run_tasks), @@ -330,16 +336,38 @@ def _prepare_dashboard_data( } ) - # Taskfile metadata from the most recent run - latest = runs[0] if runs else {} + # Workflow metadata from the most recent run per workflow. + workflow_meta: Dict[str, Dict[str, Any]] = {} + for run in enriched_runs: + wf = run.get("workflow") or "" + if wf not in workflow_meta: + workflow_meta[wf] = { + "taskfile_name": run.get("taskfile_name", ""), + "taskfile_description": run.get("taskfile_description", ""), + "taskfile_author": run.get("taskfile_author", ""), + "run_count": 0, + } + workflow_meta[wf]["run_count"] += 1 + + workflows = list(workflow_meta.keys()) + effective_workflow = selected_workflow if selected_workflow in workflow_meta else "" + if not effective_workflow and workflows: + effective_workflow = workflows[0] + + workflow_run_count = workflow_meta.get(effective_workflow, {}).get("run_count", 0) + effective_default_runs = min(default_runs, workflow_run_count) return { - "workflow": latest.get("workflow", ""), - "taskfile_name": latest.get("taskfile_name", ""), - "taskfile_description": latest.get("taskfile_description", ""), - "taskfile_author": latest.get("taskfile_author", ""), + "workflow": effective_workflow, + "workflows": workflows, + "workflow_meta": workflow_meta, + "taskfile_name": workflow_meta.get(effective_workflow, {}).get("taskfile_name", ""), + "taskfile_description": workflow_meta.get(effective_workflow, {}).get( + "taskfile_description", "" + ), + "taskfile_author": workflow_meta.get(effective_workflow, {}).get("taskfile_author", ""), "generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - "default_runs": min(default_runs, len(runs)), + "default_runs": effective_default_runs, "total_runs": len(runs), "runs": enriched_runs, "task_results": slim_task_results, @@ -367,7 +395,12 @@ def generate_dashboard( :param dag_url: Optional relative URL to the DAG visualization HTML :return: Path to the generated HTML file """ - data = _prepare_dashboard_data(runs, task_results, default_runs) + data = _prepare_dashboard_data( + runs, + task_results, + default_runs, + selected_workflow=workflow, + ) data_json = json.dumps(data, default=str) # Build conditional DAG link HTML @@ -623,6 +656,10 @@ def generate_dashboard(
{dag_link_html} +
+ + +
@@ -758,16 +795,54 @@ def generate_dashboard( ]; function init() {{ - // Build run count selector options + // Build workflow selector first, then run selector for the selected workflow. + const workflowSel = document.getElementById('workflowSelect'); + DATA.workflows.forEach(wf => {{ + const opt = document.createElement('option'); + const wfRunCount = (DATA.workflow_meta[wf] && DATA.workflow_meta[wf].run_count) || 0; + opt.value = wf; + opt.textContent = `${{wf}} (${{wfRunCount}})`; + workflowSel.appendChild(opt); + }}); + if (DATA.workflow && DATA.workflows.includes(DATA.workflow)) {{ + workflowSel.value = DATA.workflow; + }} + + rebuildRunCountSelector(DATA.default_runs); + + document.getElementById('generatedAt').textContent = 'Generated: ' + DATA.generated_at; + updateDashboard(); + }} + + function getSelectedWorkflow() {{ + return document.getElementById('workflowSelect').value; + }} + + function getRunsForWorkflow(workflow) {{ + return DATA.runs.filter(r => (r.workflow || '') === workflow); + }} + + function rebuildRunCountSelector(preferredCount) {{ const sel = document.getElementById('runCount'); - const total = DATA.total_runs; + const total = getRunsForWorkflow(getSelectedWorkflow()).length; + sel.innerHTML = ''; + + if (total === 0) {{ + const opt = document.createElement('option'); + opt.value = 0; + opt.textContent = '0'; + sel.appendChild(opt); + sel.value = 0; + return; + }} + const options = []; for (let i = 1; i <= Math.min(total, 5); i++) options.push(i); if (total > 5) options.push(5); if (total > 10) options.push(10); if (total > 20) options.push(20); if (total > 5) options.push(total); - // Deduplicate and sort + const unique = [...new Set(options)].sort((a, b) => a - b); unique.forEach(n => {{ const opt = document.createElement('option'); @@ -775,16 +850,23 @@ def generate_dashboard( opt.textContent = n === total ? `All (${{total}})` : n; sel.appendChild(opt); }}); - sel.value = DATA.default_runs; - document.getElementById('generatedAt').textContent = 'Generated: ' + DATA.generated_at; + const defaultCount = preferredCount || DATA.default_runs || unique[0]; + const boundedCount = Math.min(defaultCount, total); + sel.value = String(boundedCount); + }} + + function onWorkflowChange() {{ + rebuildRunCountSelector(); updateDashboard(); }} function getSelectedRuns() {{ const n = parseInt(document.getElementById('runCount').value); + const workflowRuns = getRunsForWorkflow(getSelectedWorkflow()); + if (!n) return []; // Runs are ordered DESC (newest first); take last N and reverse for chronological display - return DATA.runs.slice(0, n).reverse(); + return workflowRuns.slice(0, n).reverse(); }} function statusBadge(status) {{ @@ -815,17 +897,21 @@ def generate_dashboard( function updateDashboard() {{ destroyCharts(); + const selectedWorkflow = getSelectedWorkflow(); + const selectedWorkflowMeta = DATA.workflow_meta[selectedWorkflow] || {{}}; + const workflowTotalRuns = getRunsForWorkflow(selectedWorkflow).length; const runs = getSelectedRuns(); // Header subtitle document.getElementById('headerSubtitle').textContent = - `${{DATA.workflow}}${{DATA.taskfile_name ? ' — ' + DATA.taskfile_name : ''}}`; + `${{selectedWorkflow}}${{selectedWorkflowMeta.taskfile_name ? ' - ' + selectedWorkflowMeta.taskfile_name : ''}}`; // Metadata const meta = document.getElementById('metadata'); let metaHtml = ''; - if (DATA.taskfile_description) metaHtml += `${{DATA.taskfile_description}}`; - if (DATA.taskfile_author) metaHtml += `Author: ${{DATA.taskfile_author}}`; + if (selectedWorkflowMeta.taskfile_description) metaHtml += `${{selectedWorkflowMeta.taskfile_description}}`; + if (selectedWorkflowMeta.taskfile_author) metaHtml += `Author: ${{selectedWorkflowMeta.taskfile_author}}`; + metaHtml += `Workflow runs: ${{workflowTotalRuns}}`; metaHtml += `Total runs in DB: ${{DATA.total_runs}}`; if (runs.length > 0) {{ metaHtml += `Date range: ${{formatDate(runs[0].start_time)}} — ${{formatDate(runs[runs.length-1].start_time)}}`; @@ -833,7 +919,7 @@ def generate_dashboard( meta.innerHTML = metaHtml; // Summary cards - updateSummaryCards(runs); + updateSummaryCards(runs, workflowTotalRuns); // Charts renderRunDurationChart(runs); @@ -850,7 +936,7 @@ def generate_dashboard( updateConfigDetails(runs); }} - function updateSummaryCards(runs) {{ + function updateSummaryCards(runs, workflowTotalRuns) {{ const cards = document.getElementById('summaryCards'); if (runs.length === 0) {{ cards.innerHTML = '
No runs available
'; return; }} @@ -866,7 +952,7 @@ def generate_dashboard(
Runs Shown
${{runs.length}}
-
of ${{DATA.total_runs}} total
+
of ${{workflowTotalRuns}} in workflow
Tasks per Run
diff --git a/src/rushti/db_admin.py b/src/rushti/db_admin.py index f4c6ee7..581e45a 100644 --- a/src/rushti/db_admin.py +++ b/src/rushti/db_admin.py @@ -15,11 +15,31 @@ from pathlib import Path from typing import Any, Dict, List, Optional, Tuple -from rushti.stats import DEFAULT_DB_PATH +from rushti.stats import DEFAULT_DB_PATH # used by SQLite-only admin functions logger = logging.getLogger(__name__) +def _to_float(value: Any) -> Optional[float]: + """Best-effort numeric conversion used by backend dispatch helpers.""" + if value is None: + return None + try: + return float(value) + except (TypeError, ValueError): + return None + + +def _to_int(value: Any) -> Optional[int]: + """Best-effort integer conversion used by backend dispatch helpers.""" + if value is None: + return None + try: + return int(value) + except (TypeError, ValueError): + return None + + def get_db_stats(db_path: str = DEFAULT_DB_PATH) -> Dict[str, Any]: """Get overall database statistics. @@ -209,109 +229,250 @@ def list_workflows(db_path: str = DEFAULT_DB_PATH) -> List[Dict[str, Any]]: def list_runs( - workflow: str, db_path: str = DEFAULT_DB_PATH, limit: int = 20 + workflow: str, + backend: Any = DEFAULT_DB_PATH, + limit: int = 20, ) -> List[Dict[str, Any]]: """List runs for a specific workflow. :param workflow: Workflow name to query - :param db_path: Path to SQLite database + :param backend: Stats backend (StatsDatabase or DynamoDBStatsDatabase), or SQLite db path :param limit: Maximum number of runs to return :return: List of run summaries """ - if not Path(db_path).exists(): - return [] + if isinstance(backend, str): + db_path = backend + if not Path(db_path).exists(): + return [] - conn = sqlite3.connect(db_path) - cursor = conn.cursor() + conn = sqlite3.connect(db_path) + cursor = conn.cursor() - # Query from runs table for run-level data, with task stats from task_results - cursor.execute( - """ - SELECT - r.run_id, - r.start_time, - r.end_time, - r.duration_seconds, - r.task_count, - r.success_count, - (SELECT SUM(duration_seconds) FROM task_results WHERE run_id = r.run_id) as total_task_duration - FROM runs r - WHERE r.workflow = ? - ORDER BY r.start_time DESC - LIMIT ? - """, - (workflow, limit), - ) + cursor.execute( + """ + SELECT + r.run_id, + r.start_time, + r.end_time, + r.duration_seconds, + r.task_count, + r.success_count, + (SELECT SUM(duration_seconds) FROM task_results WHERE run_id = r.run_id) as total_duration + FROM runs r + WHERE r.workflow = ? + ORDER BY r.start_time DESC + LIMIT ? + """, + (workflow, limit), + ) + + runs = [] + for row in cursor.fetchall(): + task_count = row[4] or 0 + success_count = row[5] or 0 + success_rate = (success_count / task_count * 100) if task_count > 0 else 0 + total_duration = round(row[6], 2) if row[6] else 0 + runs.append( + { + "run_id": row[0], + "start_time": row[1], + "end_time": row[2], + "duration_seconds": round(row[3], 2) if row[3] else None, + "task_count": task_count, + "success_count": success_count, + "success_rate": round(success_rate, 1), + "total_duration": total_duration, + "total_task_duration": total_duration, + } + ) + + conn.close() + return runs + + runs_summary = backend.get_runs_for_workflow(workflow) or [] + if limit > 0: + runs_summary = runs_summary[:limit] + + runs: List[Dict[str, Any]] = [] + for run in runs_summary: + run_id = run.get("run_id") + task_count = _to_int(run.get("task_count")) or 0 + success_count = _to_int(run.get("success_count")) + + run_results = backend.get_run_results(run_id) if run_id else [] + total_duration = 0.0 + computed_success = 0 + + for result in run_results: + duration = _to_float(result.get("duration_seconds")) + if duration is not None: + total_duration += duration + if result.get("status") == "Success": + computed_success += 1 + + if success_count is None: + success_count = computed_success + + # Fall back to observed task results when run metadata is incomplete. + if task_count == 0 and run_results: + task_count = len(run_results) - runs = [] - for row in cursor.fetchall(): - task_count = row[4] or 0 - success_count = row[5] or 0 success_rate = (success_count / task_count * 100) if task_count > 0 else 0 + + duration_seconds = _to_float(run.get("duration_seconds")) runs.append( { - "run_id": row[0], - "start_time": row[1], - "end_time": row[2], - "duration_seconds": round(row[3], 2) if row[3] else None, + "run_id": run_id, + "start_time": run.get("start_time"), + "end_time": run.get("end_time"), + "duration_seconds": ( + round(duration_seconds, 2) if duration_seconds is not None else None + ), "task_count": task_count, "success_count": success_count, "success_rate": round(success_rate, 1), - "total_task_duration": round(row[6], 2) if row[6] else 0, + "total_duration": round(total_duration, 2), + # Keep legacy key for compatibility with older callers. + "total_task_duration": round(total_duration, 2), } ) - conn.close() return runs -def list_tasks(workflow: str, db_path: str = DEFAULT_DB_PATH) -> List[Dict[str, Any]]: +def list_tasks( + workflow: str, + backend: Any = DEFAULT_DB_PATH, +) -> List[Dict[str, Any]]: """List unique tasks for a specific workflow. :param workflow: Workflow name to query - :param db_path: Path to SQLite database + :param backend: Stats backend (StatsDatabase or DynamoDBStatsDatabase), or SQLite db path :return: List of task summaries """ - if not Path(db_path).exists(): - return [] + if isinstance(backend, str): + db_path = backend + if not Path(db_path).exists(): + return [] - conn = sqlite3.connect(db_path) - cursor = conn.cursor() + conn = sqlite3.connect(db_path) + cursor = conn.cursor() - cursor.execute( - """ - SELECT - task_signature, - task_id, - instance, - process, - COUNT(*) as run_count, - AVG(duration_seconds) as avg_duration, - COUNT(CASE WHEN status = 'Success' THEN 1 END) as success_count - FROM task_results - WHERE workflow = ? - GROUP BY task_signature - ORDER BY task_id - """, - (workflow,), - ) + cursor.execute( + """ + SELECT + task_signature, + task_id, + instance, + process, + COUNT(*) as run_count, + AVG(duration_seconds) as avg_duration, + COUNT(CASE WHEN status = 'Success' THEN 1 END) as success_count + FROM task_results + WHERE workflow = ? + GROUP BY task_signature + ORDER BY task_id + """, + (workflow,), + ) - tasks = [] - for row in cursor.fetchall(): - success_rate = (row[6] / row[4] * 100) if row[4] > 0 else 0 + tasks = [] + for row in cursor.fetchall(): + success_rate = (row[6] / row[4] * 100) if row[4] > 0 else 0 + tasks.append( + { + "task_signature": row[0], + "task_id": row[1], + "instance": row[2], + "process": row[3], + "run_count": row[4], + "execution_count": row[4], + "avg_duration": round(row[5], 2) if row[5] else 0, + "success_rate": round(success_rate, 1), + } + ) + + conn.close() + return tasks + + signatures = backend.get_workflow_signatures(workflow) or [] + runs = backend.get_runs_for_workflow(workflow) or [] + + tasks_by_signature: Dict[str, Dict[str, Any]] = { + sig: { + "task_signature": sig, + "task_id": "", + "instance": "", + "process": "", + "run_count": 0, + "success_count": 0, + "duration_total": 0.0, + "duration_count": 0, + } + for sig in signatures + } + + for run in runs: + run_id = run.get("run_id") + if not run_id: + continue + for result in backend.get_run_results(run_id): + signature = result.get("task_signature") + if not signature: + continue + + entry = tasks_by_signature.setdefault( + signature, + { + "task_signature": signature, + "task_id": "", + "instance": "", + "process": "", + "run_count": 0, + "success_count": 0, + "duration_total": 0.0, + "duration_count": 0, + }, + ) + + if not entry["task_id"]: + entry["task_id"] = result.get("task_id") or "" + if not entry["instance"]: + entry["instance"] = result.get("instance") or "" + if not entry["process"]: + entry["process"] = result.get("process") or "" + + entry["run_count"] += 1 + if result.get("status") == "Success": + entry["success_count"] += 1 + + duration = _to_float(result.get("duration_seconds")) + if duration is not None: + entry["duration_total"] += duration + entry["duration_count"] += 1 + + tasks: List[Dict[str, Any]] = [] + for signature, row in tasks_by_signature.items(): + run_count = row["run_count"] + success_rate = (row["success_count"] / run_count * 100) if run_count > 0 else 0 + avg_duration = ( + row["duration_total"] / row["duration_count"] if row["duration_count"] > 0 else 0 + ) tasks.append( { - "task_signature": row[0], - "task_id": row[1], - "instance": row[2], - "process": row[3], - "run_count": row[4], - "avg_duration": round(row[5], 2) if row[5] else 0, + "task_signature": signature, + "task_id": row["task_id"], + "instance": row["instance"], + "process": row["process"], + "run_count": run_count, + # Keep CLI-facing alias expected by current commands output. + "execution_count": run_count, + "avg_duration": round(avg_duration, 2), "success_rate": round(success_rate, 1), } ) - conn.close() + tasks.sort(key=lambda t: t["task_id"]) return tasks @@ -695,76 +856,205 @@ def show_task_history( } -def get_visualization_data(workflow: str, db_path: str = DEFAULT_DB_PATH) -> Dict[str, Any]: +def get_visualization_data( + workflow: str, + backend: Any = DEFAULT_DB_PATH, + include_all_workflows: bool = False, +) -> Dict[str, Any]: """Get all data needed for the HTML dashboard visualization. - Returns all runs and their task results for a workflow in a single - efficient query batch. + Returns all runs and their task results for a workflow. :param workflow: Workflow name to query - :param db_path: Path to SQLite database + :param backend: Stats backend (StatsDatabase or DynamoDBStatsDatabase), or SQLite db path + :param include_all_workflows: When True, return runs/tasks for all workflows + but keep workflow as the initial/default selection. :return: Dictionary with 'runs' and 'task_results' lists, or error info """ - if not Path(db_path).exists(): - return {"exists": False, "message": f"Database not found: {db_path}"} + if isinstance(backend, str): + db_path = backend + if not Path(db_path).exists(): + return {"exists": False, "message": f"Database not found: {db_path}"} + + conn = sqlite3.connect(db_path) + conn.row_factory = sqlite3.Row + cursor = conn.cursor() + + if include_all_workflows: + cursor.execute(""" + SELECT run_id, workflow, taskfile_path, start_time, end_time, + duration_seconds, status, task_count, success_count, failure_count, + taskfile_name, taskfile_description, taskfile_author, + max_workers, retries, result_file, exclusive, optimize + FROM runs + ORDER BY start_time DESC + """) + else: + cursor.execute( + """ + SELECT run_id, workflow, taskfile_path, start_time, end_time, + duration_seconds, status, task_count, success_count, failure_count, + taskfile_name, taskfile_description, taskfile_author, + max_workers, retries, result_file, exclusive, optimize + FROM runs + WHERE workflow = ? + ORDER BY start_time DESC + """, + (workflow,), + ) + runs_rows = cursor.fetchall() + + if not runs_rows: + conn.close() + return { + "exists": False, + "message": f"No runs found for workflow: {workflow}", + } - conn = sqlite3.connect(db_path) - conn.row_factory = sqlite3.Row - cursor = conn.cursor() + runs = [] + run_ids = [] + has_selected_workflow = False + for row in runs_rows: + run = dict(row) + for field in ["exclusive", "optimize"]: + if field in run and run[field] is not None: + run[field] = bool(run[field]) + runs.append(run) + run_ids.append(run["run_id"]) + if (run.get("workflow") or "").lower() == workflow.lower(): + has_selected_workflow = True + + if include_all_workflows and workflow and not has_selected_workflow: + conn.close() + return { + "exists": False, + "message": f"No runs found for workflow: {workflow}", + } - # Get all runs for this taskfile - cursor.execute( - """ - SELECT run_id, workflow, taskfile_path, start_time, end_time, - duration_seconds, status, task_count, success_count, failure_count, - taskfile_name, taskfile_description, taskfile_author, - max_workers, retries, result_file, exclusive, optimize - FROM runs - WHERE workflow = ? - ORDER BY start_time DESC - """, - (workflow,), - ) - runs_rows = cursor.fetchall() + placeholders = ",".join("?" * len(run_ids)) + cursor.execute( + f""" + SELECT run_id, task_id, task_signature, instance, process, parameters, + status, start_time, end_time, duration_seconds, retry_count, + error_message, predecessors, stage + FROM task_results + WHERE run_id IN ({placeholders}) + ORDER BY run_id, id + """, + run_ids, + ) + + task_results = [dict(row) for row in cursor.fetchall()] - if not runs_rows: conn.close() + + return { + "exists": True, + "workflow": workflow, + "runs": runs, + "task_results": task_results, + } + + if include_all_workflows: + runs_summary = backend.get_all_runs() or [] + else: + runs_summary = backend.get_runs_for_workflow(workflow) or [] + if not runs_summary: return { "exists": False, "message": f"No runs found for workflow: {workflow}", } - runs = [] - run_ids = [] - for row in runs_rows: - run = dict(row) - # Convert SQLite integers to Python booleans - for field in ["exclusive", "optimize"]: - if field in run and run[field] is not None: - run[field] = bool(run[field]) + runs: List[Dict[str, Any]] = [] + task_results: List[Dict[str, Any]] = [] + has_selected_workflow = False + + for summary in runs_summary: + run_id = summary.get("run_id") + if not run_id: + continue + + run_info = backend.get_run_info(run_id) or {} + run_workflow = run_info.get("workflow") or summary.get("workflow") or workflow + if run_workflow.lower() == workflow.lower(): + has_selected_workflow = True + run = { + "run_id": run_id, + "workflow": run_workflow, + "taskfile_path": run_info.get("taskfile_path"), + "start_time": run_info.get("start_time") or summary.get("start_time"), + "end_time": run_info.get("end_time") or summary.get("end_time"), + "duration_seconds": _to_float( + run_info.get("duration_seconds") + if run_info.get("duration_seconds") is not None + else summary.get("duration_seconds") + ), + "status": run_info.get("status") or summary.get("status"), + "task_count": _to_int( + run_info.get("task_count") + if run_info.get("task_count") is not None + else summary.get("task_count") + ), + "success_count": _to_int( + run_info.get("success_count") + if run_info.get("success_count") is not None + else summary.get("success_count") + ), + "failure_count": _to_int( + run_info.get("failure_count") + if run_info.get("failure_count") is not None + else summary.get("failure_count") + ), + "taskfile_name": run_info.get("taskfile_name"), + "taskfile_description": run_info.get("taskfile_description"), + "taskfile_author": run_info.get("taskfile_author"), + "max_workers": _to_int( + run_info.get("max_workers") + if run_info.get("max_workers") is not None + else summary.get("max_workers") + ), + "retries": _to_int(run_info.get("retries")), + "result_file": run_info.get("result_file"), + "exclusive": ( + bool(run_info["exclusive"]) if run_info.get("exclusive") is not None else None + ), + "optimize": ( + bool(run_info["optimize"]) if run_info.get("optimize") is not None else None + ), + } runs.append(run) - run_ids.append(run["run_id"]) - # Get all task results for these runs - placeholders = ",".join("?" * len(run_ids)) - cursor.execute( - f""" - SELECT run_id, task_id, task_signature, instance, process, parameters, - status, start_time, end_time, duration_seconds, retry_count, - error_message, predecessors, stage - FROM task_results - WHERE run_id IN ({placeholders}) - ORDER BY run_id, id - """, - run_ids, - ) - - task_results = [] - for row in cursor.fetchall(): - result = dict(row) - task_results.append(result) + for result in backend.get_run_results(run_id): + task_results.append( + { + "run_id": run_id, + "task_id": result.get("task_id"), + "task_signature": result.get("task_signature"), + "instance": result.get("instance"), + "process": result.get("process"), + "parameters": result.get("parameters"), + "status": result.get("status"), + "start_time": result.get("start_time"), + "end_time": result.get("end_time"), + "duration_seconds": _to_float(result.get("duration_seconds")), + "retry_count": _to_int(result.get("retry_count")) or 0, + "error_message": result.get("error_message"), + "predecessors": result.get("predecessors"), + "stage": result.get("stage"), + } + ) + + if not runs: + return { + "exists": False, + "message": f"No runs found for workflow: {workflow}", + } - conn.close() + if include_all_workflows and workflow and not has_selected_workflow: + return { + "exists": False, + "message": f"No runs found for workflow: {workflow}", + } return { "exists": True, diff --git a/src/rushti/exclusive.py b/src/rushti/exclusive.py index 94027a5..74574c6 100644 --- a/src/rushti/exclusive.py +++ b/src/rushti/exclusive.py @@ -23,7 +23,7 @@ # Context field format patterns RUSHTI_CONTEXT_PREFIX = "RushTI" RUSHTI_EXCLUSIVE_PREFIX = "RushTIX" -CONTEXT_PATTERN = re.compile(r"^RushTI(X?)(.*)$") +CONTEXT_PATTERN = re.compile(r"^RushTI(X?)_(.*)$") # TM1 context field maximum length (64 characters) TM1_CONTEXT_MAX_LENGTH = 64 @@ -50,10 +50,10 @@ def build_session_context(workflow: str = "", exclusive: bool = False) -> str: :param workflow: The workflow name (may be empty) :param exclusive: Whether this is an exclusive mode session :return: Context string (e.g., "RushTI_daily-etl" or "RushTIX_daily-etl", - or "RushTI" / "RushTIX" when workflow is empty) + or "RushTI_" / "RushTIX_" when workflow is empty) """ prefix = RUSHTI_EXCLUSIVE_PREFIX if exclusive else RUSHTI_CONTEXT_PREFIX - context = f"{prefix}_{workflow}" if workflow else prefix + context = f"{prefix}_{workflow}" # Truncate if exceeds TM1 limit if len(context) > TM1_CONTEXT_MAX_LENGTH: diff --git a/src/rushti/settings.py b/src/rushti/settings.py index acd57dd..b7fab9f 100644 --- a/src/rushti/settings.py +++ b/src/rushti/settings.py @@ -102,9 +102,13 @@ class ResumeSettings: @dataclass class StatsSettings: - """SQLite stats database settings. + """Stats storage settings. - The stats database stores execution statistics for: + Supported backends: + - sqlite (default): local SQLite file + - dynamodb: AWS DynamoDB tables + + Stats storage stores execution statistics for: - Optimization features (EWMA runtime estimation) - TM1 cube logging data source - Historical analysis @@ -112,7 +116,12 @@ class StatsSettings: enabled: bool = False retention_days: int = 90 + backend: str = "sqlite" db_path: str = "" + dynamodb_region: str = "" + dynamodb_runs_table: str = "rushti_runs" + dynamodb_task_results_table: str = "rushti_task_results" + dynamodb_endpoint_url: str = "" @dataclass @@ -165,7 +174,12 @@ class Settings: "stats": { "enabled": bool, "retention_days": int, + "backend": str, "db_path": str, + "dynamodb_region": str, + "dynamodb_runs_table": str, + "dynamodb_task_results_table": str, + "dynamodb_endpoint_url": str, }, } @@ -173,6 +187,7 @@ class Settings: VALID_VALUES = { "mode": ["norm", "opt"], "level": ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], + "backend": ["sqlite", "dynamodb"], } diff --git a/src/rushti/stats.py b/src/rushti/stats.py index 734cae1..63f8b6c 100644 --- a/src/rushti/stats.py +++ b/src/rushti/stats.py @@ -17,8 +17,10 @@ import logging import os import sqlite3 +import uuid from datetime import datetime, timedelta -from typing import Any, Dict, List, Optional +from decimal import Decimal +from typing import Any, Dict, List, Optional, Union from rushti.utils import ensure_shared_file, makedirs_shared, resolve_app_path @@ -30,6 +32,13 @@ # Default retention period in days DEFAULT_RETENTION_DAYS = 90 +# Default storage backend +DEFAULT_STATS_BACKEND = "sqlite" + +# Default DynamoDB resource names +DEFAULT_DYNAMODB_RUNS_TABLE = "rushti_runs" +DEFAULT_DYNAMODB_TASK_RESULTS_TABLE = "rushti_task_results" + # Schema version (development - no migrations needed) SCHEMA_VERSION = 1 @@ -730,6 +739,23 @@ def get_runs_for_workflow(self, workflow: str) -> List[Dict[str, Any]]: return [dict(row) for row in cursor.fetchall()] + def get_all_runs(self) -> List[Dict[str, Any]]: + """Get all runs across all workflows. + + :return: List of run info dictionaries, ordered by start_time descending + """ + if not self.enabled or not self._conn: + return [] + + cursor = self._conn.cursor() + cursor.execute(""" + SELECT run_id, start_time, end_time, duration_seconds, status, task_count, + success_count, failure_count, max_workers + FROM runs + ORDER BY start_time DESC + """) + return [dict(row) for row in cursor.fetchall()] + def get_run_task_stats(self, run_id: str) -> Optional[Dict[str, Any]]: """Get aggregate task statistics for a single run. @@ -807,6 +833,541 @@ def __exit__(self, exc_type, exc_val, exc_tb) -> None: self.close() +class DynamoDBStatsDatabase: + """DynamoDB-backed database for execution statistics. + + Expected table design: + - ``runs`` table: + - Partition key: ``run_id`` (String) + - Recommended GSI: ``workflow-start_time-index`` (PK: workflow, SK: start_time) + - ``task_results`` table: + - Partition key: ``run_id`` (String) + - Sort key: ``task_result_id`` (String) + - Recommended GSI: ``signature-start_time-index`` (PK: task_signature, SK: start_time) + - Recommended GSI: ``workflow-start_time-index`` (PK: workflow, SK: start_time) + """ + + def __init__( + self, + enabled: bool = False, + region_name: Optional[str] = None, + runs_table_name: str = DEFAULT_DYNAMODB_RUNS_TABLE, + task_results_table_name: str = DEFAULT_DYNAMODB_TASK_RESULTS_TABLE, + endpoint_url: Optional[str] = None, + ): + self.enabled = enabled + self.region_name = region_name + self.runs_table_name = runs_table_name + self.task_results_table_name = task_results_table_name + self.endpoint_url = endpoint_url + self._resource = None + self._runs_table = None + self._task_results_table = None + + if self.enabled: + self._initialize_database() + + def _initialize_database(self) -> None: + try: + import boto3 + except ImportError as e: + raise RuntimeError( + "DynamoDB backend requires boto3. Install boto3 to use [stats] backend = dynamodb." + ) from e + + resource_kwargs: Dict[str, Any] = {} + if self.region_name: + resource_kwargs["region_name"] = self.region_name + if self.endpoint_url: + resource_kwargs["endpoint_url"] = self.endpoint_url + + self._resource = boto3.resource("dynamodb", **resource_kwargs) + self._runs_table = self._resource.Table(self.runs_table_name) + self._task_results_table = self._resource.Table(self.task_results_table_name) + + # Validate table accessibility early (tables must already exist). + self._runs_table.load() + self._task_results_table.load() + + logger.info( + "DynamoDB stats database initialized: runs_table=%s, task_results_table=%s", + self.runs_table_name, + self.task_results_table_name, + ) + + def _query_all(self, table, **kwargs) -> List[Dict[str, Any]]: + """Paginate a DynamoDB query, honoring Limit as a global item cap.""" + limit = kwargs.pop("Limit", None) + items: List[Dict[str, Any]] = [] + response = table.query(**kwargs) + items.extend(response.get("Items", [])) + while "LastEvaluatedKey" in response and (limit is None or len(items) < limit): + kwargs["ExclusiveStartKey"] = response["LastEvaluatedKey"] + response = table.query(**kwargs) + items.extend(response.get("Items", [])) + return items[:limit] if limit is not None else items + + def _scan_all(self, table, **kwargs) -> List[Dict[str, Any]]: + """Paginate a DynamoDB scan, honoring Limit as a global item cap.""" + limit = kwargs.pop("Limit", None) + items: List[Dict[str, Any]] = [] + response = table.scan(**kwargs) + items.extend(response.get("Items", [])) + while "LastEvaluatedKey" in response and (limit is None or len(items) < limit): + kwargs["ExclusiveStartKey"] = response["LastEvaluatedKey"] + response = table.scan(**kwargs) + items.extend(response.get("Items", [])) + return items[:limit] if limit is not None else items + + def _normalize_task_item(self, item: Dict[str, Any]) -> Dict[str, Any]: + return { + "workflow": item.get("workflow"), + "task_id": item.get("task_id"), + "task_signature": item.get("task_signature"), + "instance": item.get("instance"), + "process": item.get("process"), + "parameters": item.get("parameters", "{}"), + "status": item.get("status"), + "start_time": item.get("start_time"), + "end_time": item.get("end_time"), + "duration_seconds": self._to_float(item.get("duration_seconds")), + "retry_count": int(item["retry_count"]) if item.get("retry_count") is not None else 0, + "error_message": item.get("error_message"), + "predecessors": item.get("predecessors"), + "stage": item.get("stage"), + "safe_retry": item.get("safe_retry"), + "timeout": item.get("timeout"), + "cancel_at_timeout": item.get("cancel_at_timeout"), + "require_predecessor_success": item.get("require_predecessor_success"), + "succeed_on_minor_errors": item.get("succeed_on_minor_errors"), + } + + def _to_float(self, value: Any) -> Optional[float]: + if value is None: + return None + try: + return float(value) + except (TypeError, ValueError): + return None + + def _as_decimal(self, value: float) -> Decimal: + """Convert float to Decimal for DynamoDB storage. + + boto3's DynamoDB resource interface does not accept Python floats; + Decimal is required for all numeric values stored as DynamoDB N type. + """ + return Decimal(str(round(value, 3))) + + def start_run( + self, + run_id: str, + workflow: str, + taskfile_path: Optional[str] = None, + task_count: int = 0, + taskfile_name: Optional[str] = None, + taskfile_description: Optional[str] = None, + taskfile_author: Optional[str] = None, + max_workers: Optional[int] = None, + retries: Optional[int] = None, + result_file: Optional[str] = None, + exclusive: Optional[bool] = None, + optimize: Optional[bool] = None, + optimization_algorithm: Optional[str] = None, + ) -> None: + if not self.enabled: + return + + item = { + "run_id": run_id, + "workflow": workflow, + "taskfile_path": taskfile_path, + "start_time": datetime.now().isoformat(), + "task_count": task_count, + "taskfile_name": taskfile_name, + "taskfile_description": taskfile_description, + "taskfile_author": taskfile_author, + "max_workers": max_workers, + "retries": retries, + "result_file": result_file, + "exclusive": exclusive, + "optimize": optimize, + "optimization_algorithm": optimization_algorithm, + } + self._runs_table.put_item(Item=item) + + def record_task( + self, + run_id: str, + task_id: str, + instance: str, + process: str, + parameters: Optional[Dict[str, Any]], + success: bool, + start_time: datetime, + end_time: datetime, + retry_count: int = 0, + error_message: Optional[str] = None, + predecessors: Optional[List[str]] = None, + stage: Optional[str] = None, + safe_retry: Optional[bool] = None, + timeout: Optional[int] = None, + cancel_at_timeout: Optional[bool] = None, + require_predecessor_success: Optional[bool] = None, + succeed_on_minor_errors: Optional[bool] = None, + workflow: Optional[str] = None, + ) -> None: + if not self.enabled: + return + + duration = (end_time - start_time).total_seconds() + task_signature = calculate_task_signature(instance, process, parameters) + + item = { + "run_id": run_id, + "task_result_id": f"{start_time.isoformat()}#{task_id}#{uuid.uuid4().hex[:8]}", + "workflow": workflow, + "task_id": task_id, + "task_signature": task_signature, + "instance": instance, + "process": process, + "parameters": json.dumps(parameters) if parameters else "{}", + "status": "Success" if success else "Fail", + "start_time": start_time.isoformat(), + "end_time": end_time.isoformat(), + "duration_seconds": self._as_decimal(duration), + "retry_count": retry_count, + "error_message": error_message if not success else None, + "predecessors": json.dumps(predecessors) if predecessors else None, + "stage": stage, + "safe_retry": safe_retry, + "timeout": timeout, + "cancel_at_timeout": cancel_at_timeout, + "require_predecessor_success": require_predecessor_success, + "succeed_on_minor_errors": succeed_on_minor_errors, + } + self._task_results_table.put_item(Item=item) + + def batch_record_tasks(self, tasks: List[Dict[str, Any]]) -> None: + if not self.enabled or not tasks: + return + + with self._task_results_table.batch_writer() as writer: + for task in tasks: + start_time = task["start_time"] + end_time = task["end_time"] + duration = (end_time - start_time).total_seconds() + instance = task["instance"] + process = task["process"] + parameters = task.get("parameters") + task_signature = calculate_task_signature(instance, process, parameters) + + writer.put_item( + Item={ + "run_id": task["run_id"], + "task_result_id": f"{start_time.isoformat()}#{task['task_id']}#{uuid.uuid4().hex[:8]}", + "workflow": task.get("workflow"), + "task_id": task["task_id"], + "task_signature": task_signature, + "instance": instance, + "process": process, + "parameters": json.dumps(parameters) if parameters else "{}", + "status": "Success" if task["success"] else "Fail", + "start_time": start_time.isoformat(), + "end_time": end_time.isoformat(), + "duration_seconds": self._as_decimal(duration), + "retry_count": task.get("retry_count", 0), + "error_message": task.get("error_message") if not task["success"] else None, + "predecessors": ( + json.dumps(task.get("predecessors")) + if task.get("predecessors") + else None + ), + "stage": task.get("stage"), + "safe_retry": task.get("safe_retry"), + "timeout": task.get("timeout"), + "cancel_at_timeout": task.get("cancel_at_timeout"), + "require_predecessor_success": task.get("require_predecessor_success"), + "succeed_on_minor_errors": task.get("succeed_on_minor_errors"), + } + ) + + def complete_run( + self, + run_id: str, + status: str = "Success", + success_count: int = 0, + failure_count: int = 0, + ) -> None: + if not self.enabled: + return + + run_info = self.get_run_info(run_id) + end_time = datetime.now() + duration_seconds = None + if run_info and run_info.get("start_time"): + try: + started = datetime.fromisoformat(run_info["start_time"]) + duration_seconds = (end_time - started).total_seconds() + except (TypeError, ValueError): + duration_seconds = None + + self._runs_table.update_item( + Key={"run_id": run_id}, + UpdateExpression=( + "SET end_time = :end_time, duration_seconds = :duration, #status = :status, " + "success_count = :success_count, failure_count = :failure_count" + ), + ExpressionAttributeNames={"#status": "status"}, + ExpressionAttributeValues={ + ":end_time": end_time.isoformat(), + ":duration": ( + self._as_decimal(duration_seconds) if duration_seconds is not None else None + ), + ":status": status, + ":success_count": success_count, + ":failure_count": failure_count, + }, + ) + + def cleanup_old_data(self, retention_days: int) -> int: + if not self.enabled or retention_days <= 0: + return 0 + + cutoff = (datetime.now() - timedelta(days=retention_days)).isoformat() + runs = self._scan_all(self._runs_table) + old_runs = [r for r in runs if (r.get("start_time") or "") < cutoff] + + for run in old_runs: + run_id = run["run_id"] + from boto3.dynamodb.conditions import Key + + task_items = self._query_all( + self._task_results_table, + KeyConditionExpression=Key("run_id").eq(run_id), + ) + with self._task_results_table.batch_writer() as writer: + for task in task_items: + writer.delete_item( + Key={"run_id": run_id, "task_result_id": task["task_result_id"]} + ) + self._runs_table.delete_item(Key={"run_id": run_id}) + + if old_runs: + logger.info("Cleaned up %s runs older than %s days", len(old_runs), retention_days) + return len(old_runs) + + def get_task_history(self, task_signature: str, limit: int = 10) -> List[Dict[str, Any]]: + if not self.enabled: + return [] + + try: + from boto3.dynamodb.conditions import Key + + items = self._query_all( + self._task_results_table, + IndexName="signature-start_time-index", + KeyConditionExpression=Key("task_signature").eq(task_signature), + ScanIndexForward=False, + Limit=limit, + ) + except Exception as e: + from botocore.exceptions import ClientError + + if isinstance(e, ClientError) and e.response["Error"]["Code"] not in ( + "ValidationException", + "ResourceNotFoundException", + ): + raise + logger.warning( + "GSI 'signature-start_time-index' not available, falling back to scan: %s", e + ) + items = [ + i + for i in self._scan_all(self._task_results_table) + if i.get("task_signature") == task_signature + ] + items.sort(key=lambda i: i.get("start_time", ""), reverse=True) + + results = [] + for item in items: + if item.get("status") != "Success": + continue + results.append(self._normalize_task_item(item)) + if len(results) >= limit: + break + return results + + def get_workflow_signatures(self, workflow: str) -> List[str]: + if not self.enabled: + return [] + + items = [ + i + for i in self._scan_all(self._task_results_table) + if i.get("workflow") == workflow and i.get("task_signature") + ] + return sorted(set(i["task_signature"] for i in items)) + + def get_task_sample_count(self, task_signature: str) -> int: + history = self.get_task_history(task_signature, limit=10000) + return len(history) + + def get_task_durations(self, task_signature: str, limit: int = 10) -> List[float]: + history = self.get_task_history(task_signature, limit=limit) + durations: List[float] = [] + for row in history: + duration = self._to_float(row.get("duration_seconds")) + if duration is not None: + durations.append(duration) + return durations + + def get_run_results(self, run_id: str) -> List[Dict[str, Any]]: + if not self.enabled: + return [] + + from boto3.dynamodb.conditions import Key + + items = self._query_all( + self._task_results_table, + KeyConditionExpression=Key("run_id").eq(run_id), + ScanIndexForward=True, + ) + return [self._normalize_task_item(item) for item in items] + + def get_run_info(self, run_id: str) -> Optional[Dict[str, Any]]: + if not self.enabled: + return None + + response = self._runs_table.get_item(Key={"run_id": run_id}) + return response.get("Item") + + def get_runs_for_workflow(self, workflow: str) -> List[Dict[str, Any]]: + if not self.enabled: + return [] + + try: + from boto3.dynamodb.conditions import Key + + items = self._query_all( + self._runs_table, + IndexName="workflow-start_time-index", + KeyConditionExpression=Key("workflow").eq(workflow), + ScanIndexForward=False, + ) + except Exception as e: + from botocore.exceptions import ClientError + + if isinstance(e, ClientError) and e.response["Error"]["Code"] not in ( + "ValidationException", + "ResourceNotFoundException", + ): + raise + logger.warning( + "GSI 'workflow-start_time-index' not available, falling back to scan: %s", e + ) + items = [i for i in self._scan_all(self._runs_table) if i.get("workflow") == workflow] + items.sort(key=lambda i: i.get("start_time", ""), reverse=True) + + results: List[Dict[str, Any]] = [] + for item in items: + results.append( + { + "run_id": item.get("run_id"), + "start_time": item.get("start_time"), + "end_time": item.get("end_time"), + "duration_seconds": self._to_float(item.get("duration_seconds")), + "status": item.get("status"), + "task_count": item.get("task_count"), + "success_count": item.get("success_count"), + "failure_count": item.get("failure_count"), + "max_workers": item.get("max_workers"), + } + ) + return results + + def get_all_runs(self) -> List[Dict[str, Any]]: + if not self.enabled: + return [] + + items = list(self._scan_all(self._runs_table)) + items.sort(key=lambda i: i.get("start_time", ""), reverse=True) + results: List[Dict[str, Any]] = [] + for item in items: + results.append( + { + "run_id": item.get("run_id"), + "start_time": item.get("start_time"), + "end_time": item.get("end_time"), + "duration_seconds": self._to_float(item.get("duration_seconds")), + "status": item.get("status"), + "task_count": item.get("task_count"), + "success_count": item.get("success_count"), + "failure_count": item.get("failure_count"), + "max_workers": item.get("max_workers"), + } + ) + return results + + def get_run_task_stats(self, run_id: str) -> Optional[Dict[str, Any]]: + if not self.enabled: + return None + + results = [r for r in self.get_run_results(run_id) if r.get("status") == "Success"] + if not results: + return None + + durations = [d for d in (self._to_float(r.get("duration_seconds")) for r in results) if d] + if not durations: + return None + return { + "total_duration": sum(durations), + "task_count": len(durations), + "avg_duration": sum(durations) / len(durations), + } + + def get_concurrent_task_counts(self, run_id: str) -> List[Dict[str, Any]]: + if not self.enabled: + return [] + + results = [r for r in self.get_run_results(run_id) if r.get("status") == "Success"] + parsed = [] + for row in results: + try: + start = datetime.fromisoformat(row["start_time"]) + end = datetime.fromisoformat(row["end_time"]) + except (KeyError, TypeError, ValueError): + continue + parsed.append((row, start, end)) + + output: List[Dict[str, Any]] = [] + for row, start, end in parsed: + overlap = 0 + for other_row, other_start, other_end in parsed: + if other_row is row: + continue + if other_start < end and other_end > start: + overlap += 1 + output.append( + { + "task_signature": row.get("task_signature"), + "duration_seconds": self._to_float(row.get("duration_seconds")), + "concurrent_count": overlap, + } + ) + return output + + def close(self) -> None: + self._resource = None + self._runs_table = None + self._task_results_table = None + + def __enter__(self) -> "DynamoDBStatsDatabase": + return self + + def __exit__(self, exc_type, exc_val, exc_tb) -> None: + self.close() + + def get_db_path(settings=None) -> str: """Resolve database path from settings, falling back to default. @@ -822,19 +1383,65 @@ def get_db_path(settings=None) -> str: return DEFAULT_DB_PATH +def get_stats_backend(settings=None) -> str: + """Resolve stats backend from settings, falling back to SQLite.""" + if settings is None: + return DEFAULT_STATS_BACKEND + + stats = getattr(settings, "stats", settings) + backend = getattr(stats, "backend", DEFAULT_STATS_BACKEND) + return (backend or DEFAULT_STATS_BACKEND).lower() + + def create_stats_database( enabled: bool = False, db_path: str = DEFAULT_DB_PATH, retention_days: int = DEFAULT_RETENTION_DAYS, -) -> StatsDatabase: - """Factory function to create a StatsDatabase with configuration. + backend: str = DEFAULT_STATS_BACKEND, + dynamodb_region: Optional[str] = None, + dynamodb_runs_table: str = DEFAULT_DYNAMODB_RUNS_TABLE, + dynamodb_task_results_table: str = DEFAULT_DYNAMODB_TASK_RESULTS_TABLE, + dynamodb_endpoint_url: Optional[str] = None, +) -> Union["StatsDatabase", "DynamoDBStatsDatabase"]: + """Factory function to create a stats database with the configured backend. :param enabled: Whether stats collection is enabled - :param db_path: Path to database file - :param retention_days: Data retention period in days - :return: Configured StatsDatabase instance + :param db_path: Path to SQLite database file (sqlite backend only) + :param retention_days: Data retention period in days; 0 keeps data indefinitely + :param backend: Storage backend — ``"sqlite"`` (default) or ``"dynamodb"`` + :param dynamodb_region: AWS region for DynamoDB (required when backend is dynamodb + and AWS_DEFAULT_REGION / AWS_REGION env vars are not set) + :param dynamodb_runs_table: DynamoDB table name for run-level records + :param dynamodb_task_results_table: DynamoDB table name for task-level records + :param dynamodb_endpoint_url: Optional custom endpoint URL (e.g. LocalStack) + :return: Configured StatsDatabase or DynamoDBStatsDatabase instance """ - db = StatsDatabase(db_path=db_path, enabled=enabled) + backend_normalized = (backend or DEFAULT_STATS_BACKEND).lower() + + if backend_normalized == "sqlite": + db = StatsDatabase(db_path=db_path, enabled=enabled) + elif backend_normalized == "dynamodb": + if ( + enabled + and not dynamodb_region + and not os.environ.get("AWS_DEFAULT_REGION") + and not os.environ.get("AWS_REGION") + ): + raise ValueError( + "DynamoDB backend requires a region. Set 'dynamodb_region' in [stats] settings " + "or the AWS_DEFAULT_REGION / AWS_REGION environment variable." + ) + db = DynamoDBStatsDatabase( + enabled=enabled, + region_name=dynamodb_region, + runs_table_name=dynamodb_runs_table, + task_results_table_name=dynamodb_task_results_table, + endpoint_url=dynamodb_endpoint_url, + ) + else: + raise ValueError( + f"Unsupported stats backend '{backend}'. " "Supported backends: sqlite, dynamodb" + ) # Run cleanup if enabled if enabled and retention_days > 0: diff --git a/src/rushti/taskfile_ops.py b/src/rushti/taskfile_ops.py index cbec7ab..4999fb9 100644 --- a/src/rushti/taskfile_ops.py +++ b/src/rushti/taskfile_ops.py @@ -797,6 +797,72 @@ def validate_taskfile_full( return result +def visualize_dag_from_db_results( + task_results: List[Dict[str, Any]], + output_path: str, + dashboard_url: Optional[str] = None, +) -> str: + """Generate interactive HTML DAG visualization from DB task results. + + Builds the DAG from task result records stored in the stats database, + so no taskfile on disk is required. + + :param task_results: List of task result dicts for a single run (from get_visualization_data) + :param output_path: Path to output HTML file + :param dashboard_url: Optional relative URL back to the dashboard HTML + :return: Path to the generated HTML file + """ + tasks_by_id: Dict[str, "TaskDefinition"] = {} + adjacency: Dict[str, List[str]] = {} + + for tr in task_results: + task_id = tr.get("task_id") + if not task_id or task_id in tasks_by_id: + continue + + params = tr.get("parameters") + if isinstance(params, str): + try: + params = json.loads(params) + except Exception: + params = {} + params = params or {} + + predecessors = tr.get("predecessors") + if isinstance(predecessors, str): + try: + predecessors = json.loads(predecessors) + except Exception: + predecessors = [] + predecessors = predecessors or [] + + tasks_by_id[task_id] = TaskDefinition( + id=task_id, + instance=tr.get("instance") or "", + process=tr.get("process") or "", + parameters=params, + predecessors=predecessors, + stage=tr.get("stage"), + ) + + for pred in predecessors: + if pred not in adjacency: + adjacency[pred] = [] + adjacency[pred].append(task_id) + + output_path_obj = Path(output_path) + output_base = ( + str(output_path_obj.with_suffix("")) if output_path_obj.suffix else str(output_path_obj) + ) + + return _visualize_dag_html( + adjacency=adjacency, + tasks_by_id=tasks_by_id, + filename=output_base, + dashboard_url=dashboard_url, + ) + + def _check_dag_cycles(tasks: List[TaskDefinition]) -> List[str]: """Check for cycles in task dependencies. diff --git a/tests/integration/test_exclusive.py b/tests/integration/test_exclusive.py index 2f88bba..7d7cd70 100644 --- a/tests/integration/test_exclusive.py +++ b/tests/integration/test_exclusive.py @@ -109,7 +109,7 @@ def test_check_active_sessions_excludes_own(self): # Our session should not appear in results (even if it has RushTI context) for session in sessions: - self.assertNotEqual(session.workflow, "_test-exclusive") + self.assertNotEqual(session.workflow, "test-exclusive") def test_wait_for_exclusive_no_blocking(self): """Test wait_for_exclusive_access proceeds when no blocking sessions.""" diff --git a/tests/integration/test_v11_v12_results.py b/tests/integration/test_v11_v12_results.py index c73eacc..485d7ca 100644 --- a/tests/integration/test_v11_v12_results.py +++ b/tests/integration/test_v11_v12_results.py @@ -76,6 +76,7 @@ def test_push_results_csv(self): with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f: db_path = f.name + stats_db = None try: stats_db = StatsDatabase(db_path=db_path, enabled=True) run_id = "test_push_v11_" + datetime.now().strftime("%Y%m%d_%H%M%S") @@ -119,9 +120,9 @@ def test_push_results_csv(self): self.tm1.files.delete(actual_name) except Exception: pass - - stats_db.close() finally: + if stats_db is not None: + stats_db.close() os.unlink(db_path) def test_execute_with_return(self): @@ -163,6 +164,7 @@ def test_push_results_csv(self): with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f: db_path = f.name + stats_db = None try: stats_db = StatsDatabase(db_path=db_path, enabled=True) run_id = "test_push_v12_" + datetime.now().strftime("%Y%m%d_%H%M%S") @@ -202,9 +204,9 @@ def test_push_results_csv(self): self.tm1.files.delete(file_name) except Exception: pass - - stats_db.close() finally: + if stats_db is not None: + stats_db.close() os.unlink(db_path) def test_execute_with_return(self): @@ -220,6 +222,7 @@ def test_auto_load_results(self): with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f: db_path = f.name + stats_db = None try: stats_db = StatsDatabase(db_path=db_path, enabled=True) run_id = "test_autoload_v12_" + datetime.now().strftime("%Y%m%d_%H%M%S") @@ -283,9 +286,9 @@ def test_auto_load_results(self): self.tm1.files.delete(file_name) except Exception: pass - - stats_db.close() finally: + if stats_db is not None: + stats_db.close() os.unlink(db_path) diff --git a/tests/unit/test_contention_analyzer.py b/tests/unit/test_contention_analyzer.py index 56111dc..549dc9d 100644 --- a/tests/unit/test_contention_analyzer.py +++ b/tests/unit/test_contention_analyzer.py @@ -562,19 +562,20 @@ def _create_mock_stats_db(self, task_rows, ewma_durations): stats_db = Mock() stats_db.enabled = True - # Mock _conn.cursor() for _get_task_parameters - mock_cursor = Mock() - mock_rows = [] - for row in task_rows: - mock_row = { + # Mock get_runs_for_workflow: one successful run with no ceiling data + stats_db.get_runs_for_workflow.return_value = [{"run_id": "run1", "status": "Success"}] + + # Mock get_run_results for _get_task_parameters + mock_rows = [ + { "task_id": row["task_id"], "task_signature": row["task_signature"], "process": row["process"], "parameters": json.dumps(row["parameters"]), } - mock_rows.append(mock_row) - mock_cursor.fetchall.return_value = mock_rows - stats_db._conn.cursor.return_value = mock_cursor + for row in task_rows + ] + stats_db.get_run_results.return_value = mock_rows # Mock get_workflow_signatures stats_db.get_workflow_signatures.return_value = list(ewma_durations.keys()) @@ -585,8 +586,9 @@ def get_durations(sig, limit=10): stats_db.get_task_durations.side_effect = get_durations - # Mock ceiling-detection methods (default: no runs for ceiling analysis) - stats_db.get_runs_for_workflow.return_value = [] + # Ceiling detection: no stats or concurrent data by default + stats_db.get_run_task_stats.return_value = None + stats_db.get_concurrent_task_counts.return_value = [] return stats_db @@ -923,9 +925,8 @@ class TestGetArchivedTaskfilePath(TestCase): def test_returns_path_for_successful_run(self): """Returns taskfile_path from the most recent successful run.""" stats_db = Mock() - mock_cursor = Mock() - mock_cursor.fetchone.return_value = {"taskfile_path": "/archive/my_workflow/run123.json"} - stats_db._conn.cursor.return_value = mock_cursor + stats_db.get_runs_for_workflow.return_value = [{"run_id": "run123", "status": "Success"}] + stats_db.get_run_info.return_value = {"taskfile_path": "/archive/my_workflow/run123.json"} result = get_archived_taskfile_path(stats_db, "my_workflow") self.assertEqual(result, "/archive/my_workflow/run123.json") @@ -933,9 +934,7 @@ def test_returns_path_for_successful_run(self): def test_returns_none_when_no_runs(self): """Returns None when no successful runs exist.""" stats_db = Mock() - mock_cursor = Mock() - mock_cursor.fetchone.return_value = None - stats_db._conn.cursor.return_value = mock_cursor + stats_db.get_runs_for_workflow.return_value = [] result = get_archived_taskfile_path(stats_db, "empty_workflow") self.assertIsNone(result) @@ -943,9 +942,8 @@ def test_returns_none_when_no_runs(self): def test_returns_none_when_path_is_null(self): """Returns None when taskfile_path is NULL in the database.""" stats_db = Mock() - mock_cursor = Mock() - mock_cursor.fetchone.return_value = {"taskfile_path": None} - stats_db._conn.cursor.return_value = mock_cursor + stats_db.get_runs_for_workflow.return_value = [{"run_id": "run123", "status": "Success"}] + stats_db.get_run_info.return_value = {"taskfile_path": None} result = get_archived_taskfile_path(stats_db, "workflow_with_null") self.assertIsNone(result) @@ -1340,19 +1338,21 @@ def _create_mock_stats_db(self, task_rows, ewma_durations, runs=None): stats_db = Mock() stats_db.enabled = True - # Mock _conn.cursor() for _get_task_parameters - mock_cursor = Mock() - mock_rows = [] - for row in task_rows: - mock_row = { + # If no runs provided, use a minimal successful run (no ceiling data) + effective_runs = runs if runs is not None else [{"run_id": "run1", "status": "Success"}] + stats_db.get_runs_for_workflow.return_value = effective_runs + + # Mock get_run_results for _get_task_parameters + mock_rows = [ + { "task_id": row["task_id"], "task_signature": row["task_signature"], "process": row["process"], "parameters": json.dumps(row["parameters"]), } - mock_rows.append(mock_row) - mock_cursor.fetchall.return_value = mock_rows - stats_db._conn.cursor.return_value = mock_cursor + for row in task_rows + ] + stats_db.get_run_results.return_value = mock_rows # Mock get_workflow_signatures stats_db.get_workflow_signatures.return_value = list(ewma_durations.keys()) @@ -1363,8 +1363,7 @@ def get_durations(sig, limit=10): stats_db.get_task_durations.side_effect = get_durations - # Mock ceiling-detection methods - stats_db.get_runs_for_workflow.return_value = runs or [] + # Ceiling detection defaults stats_db.get_run_task_stats.return_value = None stats_db.get_concurrent_task_counts.return_value = [] diff --git a/tests/unit/test_dashboard.py b/tests/unit/test_dashboard.py index 5f484f2..bff66d2 100644 --- a/tests/unit/test_dashboard.py +++ b/tests/unit/test_dashboard.py @@ -213,6 +213,28 @@ def test_stage_field_in_task_results(self): data = _prepare_dashboard_data(runs, tasks, default_runs=5) self.assertEqual(data["task_results"][0]["stage"], "extract") + def test_workflow_metadata_with_multiple_workflows(self): + """Test workflow selector metadata is prepared for multiple workflows.""" + runs = [ + _make_run(run_id="run-a1", workflow="wf-a"), + _make_run(run_id="run-b1", workflow="wf-b"), + _make_run(run_id="run-a2", workflow="wf-a"), + ] + tasks = [ + _make_task_result(run_id="run-a1", task_id="1", task_signature="a1"), + _make_task_result(run_id="run-b1", task_id="2", task_signature="b1"), + _make_task_result(run_id="run-a2", task_id="3", task_signature="a2"), + ] + + data = _prepare_dashboard_data(runs, tasks, default_runs=5, selected_workflow="wf-a") + + self.assertEqual(data["workflow"], "wf-a") + self.assertIn("wf-a", data["workflows"]) + self.assertIn("wf-b", data["workflows"]) + self.assertEqual(data["workflow_meta"]["wf-a"]["run_count"], 2) + self.assertEqual(data["workflow_meta"]["wf-b"]["run_count"], 1) + self.assertEqual(data["default_runs"], 2) + class TestGenerateDashboard(unittest.TestCase): """Tests for generate_dashboard HTML generation.""" @@ -294,6 +316,15 @@ def test_page_size_options(self): self.assertNotIn('', content) self.assertNotIn('', content) + def test_html_contains_workflow_selector(self): + """Test that generated HTML includes workflow selector controls.""" + output = os.path.join(self.temp_dir, "test_dashboard.html") + generate_dashboard("test_id", self.runs, self.tasks, output) + with open(output, encoding="utf-8") as f: + content = f.read() + self.assertIn('id="workflowSelect"', content) + self.assertIn("onWorkflowChange()", content) + def test_dag_url_link_present(self): """Test that 'View DAG' link appears when dag_url is provided.""" output = os.path.join(self.temp_dir, "test_dashboard.html") diff --git a/tests/unit/test_db_admin.py b/tests/unit/test_db_admin.py index ad0b30a..f6e4338 100644 --- a/tests/unit/test_db_admin.py +++ b/tests/unit/test_db_admin.py @@ -6,10 +6,13 @@ """ import os -import sqlite3 import tempfile import unittest from datetime import datetime, timedelta +from decimal import Decimal +from unittest.mock import MagicMock + +from rushti.stats import StatsDatabase from rushti.db_admin import ( clear_all, @@ -18,10 +21,11 @@ clear_workflow, export_to_csv, get_db_stats, + get_visualization_data, get_workflow_stats, list_runs, - list_workflows, list_tasks, + list_workflows, show_run_details, show_task_history, vacuum_database, @@ -37,49 +41,33 @@ def setUp(self): self.temp_db.close() self.db_path = self.temp_db.name - # Create database schema - conn = sqlite3.connect(self.db_path) + # Use StatsDatabase to create the full schema and open a managed connection. + self.stats_db = StatsDatabase(db_path=self.db_path, enabled=True) + conn = self.stats_db._conn cursor = conn.cursor() - # Create task_results table - cursor.execute(""" - CREATE TABLE task_results ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - run_id TEXT NOT NULL, - workflow TEXT NOT NULL, - task_id TEXT NOT NULL, - task_signature TEXT NOT NULL, - instance TEXT NOT NULL, - process TEXT NOT NULL, - parameters TEXT, - status TEXT NOT NULL, - start_time TEXT NOT NULL, - end_time TEXT NOT NULL, - duration_seconds REAL, - retry_count INTEGER DEFAULT 0, - error_message TEXT - ) - """) - - # Create runs table - cursor.execute(""" - CREATE TABLE runs ( - run_id TEXT PRIMARY KEY, - workflow TEXT NOT NULL, - start_time TEXT NOT NULL, - end_time TEXT, - duration_seconds REAL, - task_count INTEGER, - success_count INTEGER, - failure_count INTEGER - ) - """) - # Insert sample data now = datetime.now() yesterday = now - timedelta(days=1) week_ago = now - timedelta(days=7) + # Insert runs first (task_results has a FK on run_id) + run_data = [ + ("run1", "taskfile1", week_ago.isoformat(), week_ago.isoformat(), 3.5, 2, 2, 0), + ("run2", "taskfile1", yesterday.isoformat(), yesterday.isoformat(), 1.0, 1, 0, 1), + ("run3", "taskfile2", now.isoformat(), now.isoformat(), 3.0, 1, 1, 0), + ] + + cursor.executemany( + """ + INSERT INTO runs + (run_id, workflow, start_time, end_time, duration_seconds, + task_count, success_count, failure_count) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, + run_data, + ) + # Sample task results sample_data = [ ( @@ -154,30 +142,15 @@ def setUp(self): sample_data, ) - # Sample runs - run_data = [ - ("run1", "taskfile1", week_ago.isoformat(), week_ago.isoformat(), 3.5, 2, 2, 0), - ("run2", "taskfile1", yesterday.isoformat(), yesterday.isoformat(), 1.0, 1, 0, 1), - ("run3", "taskfile2", now.isoformat(), now.isoformat(), 3.0, 1, 1, 0), - ] - - cursor.executemany( - """ - INSERT INTO runs - (run_id, workflow, start_time, end_time, duration_seconds, - task_count, success_count, failure_count) - VALUES (?, ?, ?, ?, ?, ?, ?, ?) - """, - run_data, - ) - conn.commit() - conn.close() def tearDown(self): """Remove temporary database.""" - if os.path.exists(self.db_path): - os.unlink(self.db_path) + self.stats_db.close() + for suffix in ("", "-wal", "-shm"): + path = self.db_path + suffix + if os.path.exists(path): + os.unlink(path) def test_get_db_stats(self): """Test retrieving overall database statistics.""" @@ -224,7 +197,7 @@ def test_list_workflows(self): def test_list_runs(self): """Test listing runs for a workflow.""" - runs = list_runs("taskfile1", self.db_path) + runs = list_runs("taskfile1", self.stats_db) self.assertEqual(len(runs), 2) self.assertEqual(runs[0]["task_count"], 1) # Most recent first @@ -232,13 +205,41 @@ def test_list_runs(self): def test_list_tasks(self): """Test listing unique tasks for a workflow.""" - tasks = list_tasks("taskfile1", self.db_path) + tasks = list_tasks("taskfile1", self.stats_db) self.assertEqual(len(tasks), 2) task_ids = [t["task_id"] for t in tasks] self.assertIn("task1", task_ids) self.assertIn("task2", task_ids) + def test_get_visualization_data_include_all_workflows_sqlite(self): + """include_all_workflows should embed all workflows in one payload (SQLite path).""" + data = get_visualization_data( + "taskfile1", + self.db_path, + include_all_workflows=True, + ) + + self.assertTrue(data["exists"]) + self.assertEqual(data["workflow"], "taskfile1") + self.assertEqual(len(data["runs"]), 3) + self.assertEqual(len(data["task_results"]), 4) + + workflows = {r["workflow"] for r in data["runs"]} + self.assertIn("taskfile1", workflows) + self.assertIn("taskfile2", workflows) + + def test_get_visualization_data_include_all_requires_selected_workflow(self): + """include_all_workflows still fails when selected workflow does not exist.""" + data = get_visualization_data( + "missing-workflow", + self.db_path, + include_all_workflows=True, + ) + + self.assertFalse(data["exists"]) + self.assertIn("No runs found for workflow", data["message"]) + def test_clear_workflow_dry_run(self): """Test dry run of clearing workflow data.""" count = clear_workflow("taskfile1", self.db_path, dry_run=True) @@ -366,5 +367,302 @@ def test_show_task_history_nonexistent(self): self.assertIn("message", history) +class TestDatabaseAdminUtilitiesDDB(unittest.TestCase): + """Tests for db_admin functions using a mock DynamoDB stats_db. + + These are symmetric with TestDatabaseAdminUtilities but exercise the + DDB dispatch path in list_runs, list_tasks, and get_visualization_data. + """ + + def _make_mock_stats_db(self): + """Return a MagicMock that mimics DynamoDBStatsDatabase method returns.""" + now = datetime.now() + yesterday = now - timedelta(days=1) + week_ago = now - timedelta(days=7) + + mock_db = MagicMock() + + # Two runs for taskfile1 ordered most-recent-first (as get_runs_for_workflow returns) + mock_db.get_runs_for_workflow.return_value = [ + { + "run_id": "run2", + "start_time": yesterday.isoformat(), + "end_time": yesterday.isoformat(), + "duration_seconds": 1.0, + "status": "Success", + "task_count": 1, + "success_count": 0, + "failure_count": 1, + "max_workers": 2, + }, + { + "run_id": "run1", + "start_time": week_ago.isoformat(), + "end_time": week_ago.isoformat(), + "duration_seconds": 3.5, + "status": "Success", + "task_count": 2, + "success_count": 2, + "failure_count": 0, + "max_workers": 2, + }, + ] + + # Distinct signatures for taskfile1 + mock_db.get_workflow_signatures.return_value = ["sig1", "sig2"] + + _run1_tasks = [ + { + "task_id": "task1", + "task_signature": "sig1", + "workflow": "taskfile1", + "instance": "inst1", + "process": "proc1", + "parameters": "{}", + "status": "Success", + "start_time": week_ago.isoformat(), + "end_time": week_ago.isoformat(), + "duration_seconds": 1.5, + "retry_count": 0, + "error_message": None, + "predecessors": None, + "stage": None, + "safe_retry": None, + "timeout": None, + "cancel_at_timeout": None, + "require_predecessor_success": None, + "succeed_on_minor_errors": None, + }, + { + "task_id": "task2", + "task_signature": "sig2", + "workflow": "taskfile1", + "instance": "inst1", + "process": "proc2", + "parameters": "{}", + "status": "Success", + "start_time": week_ago.isoformat(), + "end_time": week_ago.isoformat(), + "duration_seconds": 2.0, + "retry_count": 0, + "error_message": None, + "predecessors": None, + "stage": None, + "safe_retry": None, + "timeout": None, + "cancel_at_timeout": None, + "require_predecessor_success": None, + "succeed_on_minor_errors": None, + }, + ] + _run2_tasks = [ + { + "task_id": "task1", + "task_signature": "sig1", + "workflow": "taskfile1", + "instance": "inst1", + "process": "proc1", + "parameters": "{}", + "status": "Fail", + "start_time": yesterday.isoformat(), + "end_time": yesterday.isoformat(), + "duration_seconds": 1.0, + "retry_count": 1, + "error_message": "Error message", + "predecessors": None, + "stage": None, + "safe_retry": None, + "timeout": None, + "cancel_at_timeout": None, + "require_predecessor_success": None, + "succeed_on_minor_errors": None, + }, + ] + mock_db.get_run_results.side_effect = lambda run_id: { + "run1": _run1_tasks, + "run2": _run2_tasks, + }.get(run_id, []) + + # get_run_info returns raw DDB items (with Decimal numerics) + mock_db.get_run_info.side_effect = lambda run_id: { + "run1": { + "run_id": "run1", + "workflow": "taskfile1", + "taskfile_path": "/path/tasks.txt", + "start_time": week_ago.isoformat(), + "end_time": week_ago.isoformat(), + "duration_seconds": Decimal("3.5"), + "status": "Success", + "task_count": Decimal("2"), + "success_count": Decimal("2"), + "failure_count": Decimal("0"), + "taskfile_name": "Daily Load", + "taskfile_description": None, + "taskfile_author": None, + "max_workers": Decimal("2"), + "retries": Decimal("0"), + "result_file": None, + "exclusive": None, + "optimize": None, + }, + "run2": { + "run_id": "run2", + "workflow": "taskfile1", + "taskfile_path": "/path/tasks.txt", + "start_time": yesterday.isoformat(), + "end_time": yesterday.isoformat(), + "duration_seconds": Decimal("1.0"), + "status": "Success", + "task_count": Decimal("1"), + "success_count": Decimal("0"), + "failure_count": Decimal("1"), + "taskfile_name": "Daily Load", + "taskfile_description": None, + "taskfile_author": None, + "max_workers": Decimal("2"), + "retries": Decimal("0"), + "result_file": None, + "exclusive": None, + "optimize": None, + }, + }.get(run_id) + + return mock_db + + # ------------------------------------------------------------------ + # list_runs DDB path + # ------------------------------------------------------------------ + + def test_list_runs_returns_correct_count(self): + runs = list_runs("taskfile1", self._make_mock_stats_db()) + self.assertEqual(len(runs), 2) + + def test_list_runs_most_recent_first(self): + runs = list_runs("taskfile1", self._make_mock_stats_db()) + self.assertEqual(runs[0]["run_id"], "run2") + self.assertEqual(runs[1]["run_id"], "run1") + + def test_list_runs_task_count(self): + runs = list_runs("taskfile1", self._make_mock_stats_db()) + self.assertEqual(runs[0]["task_count"], 1) + self.assertEqual(runs[1]["task_count"], 2) + + def test_list_runs_success_rate(self): + runs = list_runs("taskfile1", self._make_mock_stats_db()) + self.assertAlmostEqual(runs[0]["success_rate"], 0.0) # run2: 0/1 + self.assertAlmostEqual(runs[1]["success_rate"], 100.0) # run1: 2/2 + + def test_list_runs_total_duration_sums_task_results(self): + runs = list_runs("taskfile1", self._make_mock_stats_db()) + # run2: 1 task × 1.0 s + self.assertAlmostEqual(runs[0]["total_duration"], 1.0) + # run1: 1.5 s + 2.0 s = 3.5 s + self.assertAlmostEqual(runs[1]["total_duration"], 3.5) + + def test_list_runs_respects_limit(self): + runs = list_runs("taskfile1", self._make_mock_stats_db(), limit=1) + self.assertEqual(len(runs), 1) + + def test_list_runs_returns_required_keys(self): + runs = list_runs("taskfile1", self._make_mock_stats_db()) + required = {"run_id", "start_time", "task_count", "success_rate", "total_duration"} + self.assertTrue(required.issubset(runs[0].keys())) + + def test_list_runs_empty_when_no_runs(self): + mock_db = MagicMock() + mock_db.get_runs_for_workflow.return_value = [] + runs = list_runs("missing", mock_db) + self.assertEqual(runs, []) + + # ------------------------------------------------------------------ + # list_tasks DDB path + # ------------------------------------------------------------------ + + def test_list_tasks_returns_unique_signatures(self): + tasks = list_tasks("taskfile1", self._make_mock_stats_db()) + signatures = {t["task_signature"] for t in tasks} + self.assertIn("sig1", signatures) + self.assertIn("sig2", signatures) + + def test_list_tasks_aggregates_across_all_runs(self): + tasks = list_tasks("taskfile1", self._make_mock_stats_db()) + sig1 = next(t for t in tasks if t["task_signature"] == "sig1") + # sig1 appears in both run1 and run2 + self.assertEqual(sig1["execution_count"], 2) + + def test_list_tasks_success_rate(self): + tasks = list_tasks("taskfile1", self._make_mock_stats_db()) + sig1 = next(t for t in tasks if t["task_signature"] == "sig1") + self.assertAlmostEqual(sig1["success_rate"], 50.0) # 1 of 2 + sig2 = next(t for t in tasks if t["task_signature"] == "sig2") + self.assertAlmostEqual(sig2["success_rate"], 100.0) # 1 of 1 + + def test_list_tasks_avg_duration(self): + tasks = list_tasks("taskfile1", self._make_mock_stats_db()) + sig1 = next(t for t in tasks if t["task_signature"] == "sig1") + # (1.5 + 1.0) / 2 = 1.25 + self.assertAlmostEqual(sig1["avg_duration"], 1.25) + + def test_list_tasks_returns_required_keys(self): + tasks = list_tasks("taskfile1", self._make_mock_stats_db()) + required = {"task_signature", "execution_count", "success_rate", "avg_duration"} + self.assertTrue(required.issubset(tasks[0].keys())) + + def test_list_tasks_empty_when_no_runs(self): + mock_db = MagicMock() + mock_db.get_runs_for_workflow.return_value = [] + mock_db.get_workflow_signatures.return_value = [] + tasks = list_tasks("missing", mock_db) + self.assertEqual(tasks, []) + + # ------------------------------------------------------------------ + # get_visualization_data DDB path + # ------------------------------------------------------------------ + + def test_get_visualization_data_exists(self): + data = get_visualization_data("taskfile1", self._make_mock_stats_db()) + self.assertTrue(data["exists"]) + + def test_get_visualization_data_run_count(self): + data = get_visualization_data("taskfile1", self._make_mock_stats_db()) + self.assertEqual(len(data["runs"]), 2) + + def test_get_visualization_data_task_result_count(self): + data = get_visualization_data("taskfile1", self._make_mock_stats_db()) + # run1 has 2 tasks, run2 has 1 task + self.assertEqual(len(data["task_results"]), 3) + + def test_get_visualization_data_task_results_have_run_id(self): + data = get_visualization_data("taskfile1", self._make_mock_stats_db()) + for tr in data["task_results"]: + self.assertIn("run_id", tr) + + def test_get_visualization_data_converts_decimal_duration_to_float(self): + data = get_visualization_data("taskfile1", self._make_mock_stats_db()) + for run in data["runs"]: + if run.get("duration_seconds") is not None: + self.assertIsInstance(run["duration_seconds"], float) + + def test_get_visualization_data_converts_decimal_task_count_to_int(self): + data = get_visualization_data("taskfile1", self._make_mock_stats_db()) + for run in data["runs"]: + if run.get("task_count") is not None: + self.assertIsInstance(run["task_count"], int) + + def test_get_visualization_data_includes_run_metadata_from_run_info(self): + data = get_visualization_data("taskfile1", self._make_mock_stats_db()) + run1 = next(r for r in data["runs"] if r["run_id"] == "run1") + self.assertEqual(run1["taskfile_name"], "Daily Load") + self.assertEqual(run1["taskfile_path"], "/path/tasks.txt") + self.assertEqual(run1["workflow"], "taskfile1") + + def test_get_visualization_data_not_exists_when_no_runs(self): + mock_db = MagicMock() + mock_db.get_runs_for_workflow.return_value = [] + data = get_visualization_data("missing", mock_db) + self.assertFalse(data["exists"]) + self.assertIn("message", data) + + if __name__ == "__main__": unittest.main() diff --git a/tests/unit/test_exclusive.py b/tests/unit/test_exclusive.py index a9bb925..d832d49 100644 --- a/tests/unit/test_exclusive.py +++ b/tests/unit/test_exclusive.py @@ -39,14 +39,14 @@ def test_build_context_truncation(self): self.assertTrue(context.startswith("RushTI_")) def test_build_context_empty_id(self): - """Test building context with empty workflow (no trailing underscore).""" + """Test building context with empty workflow.""" context = build_session_context("", exclusive=False) - self.assertEqual(context, "RushTI") + self.assertEqual(context, "RushTI_") def test_build_context_empty_id_exclusive(self): """Test building context with empty workflow in exclusive mode.""" context = build_session_context("", exclusive=True) - self.assertEqual(context, "RushTIX") + self.assertEqual(context, "RushTIX_") def test_build_context_special_characters(self): """Test context with special characters in workflow.""" @@ -63,7 +63,7 @@ def test_parse_normal_mode_context(self): self.assertIsNotNone(result) is_exclusive, workflow = result self.assertFalse(is_exclusive) - self.assertEqual(workflow, "_daily-etl") + self.assertEqual(workflow, "daily-etl") def test_parse_exclusive_mode_context(self): """Test parsing exclusive mode context.""" @@ -71,15 +71,12 @@ def test_parse_exclusive_mode_context(self): self.assertIsNotNone(result) is_exclusive, workflow = result self.assertTrue(is_exclusive) - self.assertEqual(workflow, "_daily-etl") + self.assertEqual(workflow, "daily-etl") def test_parse_context_without_underscore(self): - """Test parsing context without underscore (legacy format).""" + """Test that context without underscore separator is not a RushTI session.""" result = parse_session_context("RushTIdaily-etl") - self.assertIsNotNone(result) - is_exclusive, workflow = result - self.assertFalse(is_exclusive) - self.assertEqual(workflow, "daily-etl") + self.assertIsNone(result) def test_parse_non_rushti_context(self): """Test parsing non-RushTI context returns None.""" @@ -97,12 +94,13 @@ def test_parse_none_context(self): self.assertIsNone(result) def test_parse_rushti_prefix_only(self): - """Test parsing context with only RushTI prefix.""" + """Test that bare 'RushTI' prefix (no underscore) is not a RushTI session. + + Bare 'RushTI' can appear as a TM1py application_name and must not + be mistaken for a RushTI session context. + """ result = parse_session_context("RushTI") - self.assertIsNotNone(result) - is_exclusive, workflow = result - self.assertFalse(is_exclusive) - self.assertEqual(workflow, "") + self.assertIsNone(result) class TestShouldWaitForSessions(unittest.TestCase): @@ -305,7 +303,7 @@ def test_excludes_own_session_by_id(self): # Should only find the other session, not our own self.assertEqual(len(result), 1) - self.assertEqual(result[0].workflow, "_other-task") + self.assertEqual(result[0].workflow, "other-task") def test_excludes_own_context(self): """Test that own session context is excluded when specified.""" @@ -328,7 +326,7 @@ def test_excludes_own_context(self): # Should only find the other-task context self.assertEqual(len(result), 1) - self.assertEqual(result[0].workflow, "_other-task") + self.assertEqual(result[0].workflow, "other-task") def test_multiple_instances(self): """Test checking sessions across multiple TM1 instances.""" diff --git a/tests/unit/test_settings.py b/tests/unit/test_settings.py index 23b1a81..7b700ca 100644 --- a/tests/unit/test_settings.py +++ b/tests/unit/test_settings.py @@ -87,6 +87,44 @@ def test_load_settings_all_sections(self): finally: os.unlink(settings_path) + def test_load_settings_stats_dynamodb_fields(self): + """Test loading DynamoDB-specific stats settings.""" + with tempfile.NamedTemporaryFile(mode="w", suffix=".ini", delete=False) as f: + f.write("[stats]\n") + f.write("enabled = true\n") + f.write("backend = dynamodb\n") + f.write("dynamodb_region = eu-west-1\n") + f.write("dynamodb_runs_table = custom_runs\n") + f.write("dynamodb_task_results_table = custom_task_results\n") + f.write("dynamodb_endpoint_url = http://localhost:4566\n") + f.flush() + settings_path = f.name + + try: + settings = load_settings(settings_path) + self.assertEqual(settings.stats.backend, "dynamodb") + self.assertEqual(settings.stats.dynamodb_region, "eu-west-1") + self.assertEqual(settings.stats.dynamodb_runs_table, "custom_runs") + self.assertEqual(settings.stats.dynamodb_task_results_table, "custom_task_results") + self.assertEqual(settings.stats.dynamodb_endpoint_url, "http://localhost:4566") + finally: + os.unlink(settings_path) + + def test_load_settings_stats_invalid_backend(self): + """Test invalid stats backend raises ValueError.""" + with tempfile.NamedTemporaryFile(mode="w", suffix=".ini", delete=False) as f: + f.write("[stats]\n") + f.write("backend = postgresql\n") + f.flush() + settings_path = f.name + + try: + with self.assertRaises(ValueError) as ctx: + load_settings(settings_path) + self.assertIn("backend", str(ctx.exception)) + finally: + os.unlink(settings_path) + class TestSettingsValidation(unittest.TestCase): """Tests for settings validation""" diff --git a/tests/unit/test_stats.py b/tests/unit/test_stats.py index de7c579..05e4768 100644 --- a/tests/unit/test_stats.py +++ b/tests/unit/test_stats.py @@ -2,16 +2,27 @@ import os import tempfile +import unittest from datetime import datetime, timedelta +from decimal import Decimal from unittest import TestCase +from unittest.mock import MagicMock, patch from rushti.stats import ( + DynamoDBStatsDatabase, StatsDatabase, calculate_task_signature, create_stats_database, ) +try: + import botocore.exceptions # noqa: F401 + + BOTOCORE_AVAILABLE = True +except ImportError: + BOTOCORE_AVAILABLE = False + class TestTaskSignature(TestCase): """Tests for task signature calculation.""" @@ -334,6 +345,11 @@ def test_factory_runs_cleanup(self): self.assertIsNone(db2.get_run_info("old_run")) db2.close() + def test_factory_invalid_backend_raises(self): + """Factory should reject unsupported storage backends.""" + with self.assertRaises(ValueError): + create_stats_database(enabled=True, backend="unknown_backend") + class TestStatsDatabaseContextManager(TestCase): """Tests for context manager protocol.""" @@ -650,3 +666,335 @@ def test_batch_record_tasks_stores_workflow(self): self.assertEqual(results[0]["workflow"], "taskfile_xyz") self.assertEqual(results[1]["workflow"], "taskfile_xyz") db.close() + + +# --------------------------------------------------------------------------- +# DynamoDB backend tests +# --------------------------------------------------------------------------- + +_MOCK_TASK_ITEM = { + "run_id": "run1", + "task_result_id": "tr1", + "task_id": "task1", + "task_signature": "sig1", + "workflow": "taskfile1", + "instance": "inst1", + "process": "proc1", + "parameters": '{"p1": "v1"}', + "status": "Success", + "start_time": "2024-01-01T10:00:00", + "end_time": "2024-01-01T10:00:05", + "duration_seconds": Decimal("5.123"), + "retry_count": Decimal("2"), + "error_message": None, + "predecessors": None, + "stage": None, + "safe_retry": None, + "timeout": None, + "cancel_at_timeout": None, + "require_predecessor_success": None, + "succeed_on_minor_errors": None, +} + +_MOCK_RUN_ITEM = { + "run_id": "run1", + "workflow": "taskfile1", + "taskfile_path": "/path/tasks.txt", + "start_time": "2024-01-01T10:00:00", + "end_time": "2024-01-01T10:05:00", + "duration_seconds": Decimal("300.5"), + "status": "Success", + "task_count": Decimal("2"), + "success_count": Decimal("2"), + "failure_count": Decimal("0"), + "taskfile_name": "Daily Load", + "taskfile_description": None, + "taskfile_author": None, + "max_workers": Decimal("4"), + "retries": Decimal("0"), + "result_file": None, + "exclusive": None, + "optimize": None, +} + + +def _make_ddb(enabled=True): + """Create a DynamoDBStatsDatabase with _initialize_database patched out.""" + with patch.object(DynamoDBStatsDatabase, "_initialize_database"): + db = DynamoDBStatsDatabase(enabled=enabled) + db._runs_table = MagicMock() + db._task_results_table = MagicMock() + return db + + +class TestDynamoDBStatsDatabaseNormalization(TestCase): + """Tests for _normalize_task_item type conversion (Decimal → Python types).""" + + def setUp(self): + self.db = _make_ddb() + + def test_duration_seconds_converted_to_float(self): + result = self.db._normalize_task_item(_MOCK_TASK_ITEM) + self.assertIsInstance(result["duration_seconds"], float) + self.assertAlmostEqual(result["duration_seconds"], 5.123, places=3) + + def test_retry_count_converted_to_int(self): + result = self.db._normalize_task_item(_MOCK_TASK_ITEM) + self.assertIsInstance(result["retry_count"], int) + self.assertEqual(result["retry_count"], 2) + + def test_none_duration_stays_none(self): + item = dict(_MOCK_TASK_ITEM, duration_seconds=None) + result = self.db._normalize_task_item(item) + self.assertIsNone(result["duration_seconds"]) + + def test_none_retry_count_defaults_to_zero(self): + item = dict(_MOCK_TASK_ITEM, retry_count=None) + result = self.db._normalize_task_item(item) + self.assertEqual(result["retry_count"], 0) + + def test_all_expected_keys_present(self): + result = self.db._normalize_task_item(_MOCK_TASK_ITEM) + expected_keys = { + "workflow", + "task_id", + "task_signature", + "instance", + "process", + "parameters", + "status", + "start_time", + "end_time", + "duration_seconds", + "retry_count", + "error_message", + "predecessors", + "stage", + } + self.assertTrue(expected_keys.issubset(result.keys())) + + +class TestDynamoDBStatsDatabasePagination(TestCase): + """Tests for _query_all and _scan_all global Limit handling.""" + + def setUp(self): + self.db = _make_ddb() + + def _items(self, n): + return [{"id": str(i)} for i in range(n)] + + def test_query_all_paginates_to_exhaustion_without_limit(self): + table = MagicMock() + table.query.side_effect = [ + {"Items": self._items(3), "LastEvaluatedKey": {"id": "2"}}, + {"Items": self._items(3)}, + ] + items = self.db._query_all(table) + self.assertEqual(len(items), 6) + self.assertEqual(table.query.call_count, 2) + + def test_query_all_respects_limit_as_global_cap(self): + table = MagicMock() + table.query.side_effect = [ + {"Items": self._items(3), "LastEvaluatedKey": {"id": "2"}}, + {"Items": self._items(3)}, + ] + items = self.db._query_all(table, Limit=4) + self.assertEqual(len(items), 4) + + def test_query_all_does_not_forward_limit_to_dynamodb(self): + table = MagicMock() + table.query.return_value = {"Items": self._items(2)} + self.db._query_all(table, Limit=10, ScanIndexForward=False) + call_kwargs = table.query.call_args[1] + self.assertNotIn("Limit", call_kwargs) + self.assertIn("ScanIndexForward", call_kwargs) + + def test_scan_all_respects_limit_as_global_cap(self): + table = MagicMock() + table.scan.side_effect = [ + {"Items": self._items(3), "LastEvaluatedKey": {"id": "2"}}, + {"Items": self._items(3)}, + ] + items = self.db._scan_all(table, Limit=5) + self.assertEqual(len(items), 5) + + def test_scan_all_does_not_forward_limit_to_dynamodb(self): + table = MagicMock() + table.scan.return_value = {"Items": self._items(2)} + self.db._scan_all(table, Limit=10) + call_kwargs = table.scan.call_args[1] + self.assertNotIn("Limit", call_kwargs) + + def test_query_all_stops_when_no_last_evaluated_key(self): + table = MagicMock() + table.query.return_value = {"Items": self._items(3)} # no continuation + items = self.db._query_all(table) + self.assertEqual(len(items), 3) + self.assertEqual(table.query.call_count, 1) + + +@unittest.skipUnless(BOTOCORE_AVAILABLE, "botocore not installed") +class TestDynamoDBStatsDatabaseFallback(TestCase): + """Tests for GSI-missing fallback in get_task_history and get_runs_for_workflow.""" + + def setUp(self): + self.db = _make_ddb() + # get_task_history / get_runs_for_workflow do `from boto3.dynamodb.conditions import Key` + # inside the try block. Without boto3 installed that import itself raises + # ModuleNotFoundError before our mock side_effect fires, so we inject a stub. + self._boto3_patch = patch.dict( + "sys.modules", + { + "boto3": MagicMock(), + "boto3.dynamodb": MagicMock(), + "boto3.dynamodb.conditions": MagicMock(), + }, + ) + self._boto3_patch.start() + + def tearDown(self): + self._boto3_patch.stop() + + def _client_error(self, code): + from botocore.exceptions import ClientError + + return ClientError({"Error": {"Code": code, "Message": "test"}}, "Query") + + def test_get_task_history_falls_back_to_scan_on_validation_exception(self): + self.db._task_results_table.query.side_effect = self._client_error("ValidationException") + self.db._task_results_table.scan.return_value = {"Items": []} + self.db.get_task_history("sig1") + self.db._task_results_table.scan.assert_called() + + def test_get_task_history_falls_back_to_scan_on_resource_not_found(self): + self.db._task_results_table.query.side_effect = self._client_error( + "ResourceNotFoundException" + ) + self.db._task_results_table.scan.return_value = {"Items": []} + self.db.get_task_history("sig1") + self.db._task_results_table.scan.assert_called() + + def test_get_task_history_reraises_access_denied(self): + from botocore.exceptions import ClientError + + self.db._task_results_table.query.side_effect = self._client_error("AccessDeniedException") + with self.assertRaises(ClientError): + self.db.get_task_history("sig1") + + def test_get_runs_for_workflow_falls_back_to_scan_on_validation_exception(self): + self.db._runs_table.query.side_effect = self._client_error("ValidationException") + self.db._runs_table.scan.return_value = {"Items": []} + self.db.get_runs_for_workflow("wf1") + self.db._runs_table.scan.assert_called() + + def test_get_runs_for_workflow_reraises_access_denied(self): + from botocore.exceptions import ClientError + + self.db._runs_table.query.side_effect = self._client_error("AccessDeniedException") + with self.assertRaises(ClientError): + self.db.get_runs_for_workflow("wf1") + + +class TestDynamoDBStatsDatabaseOperations(TestCase): + """Tests for DynamoDBStatsDatabase CRUD operations via mocked boto3 tables.""" + + def setUp(self): + self.db = _make_ddb() + # Several methods import from boto3.dynamodb.conditions lazily; stub it out. + self._boto3_patch = patch.dict( + "sys.modules", + { + "boto3": MagicMock(), + "boto3.dynamodb": MagicMock(), + "boto3.dynamodb.conditions": MagicMock(), + }, + ) + self._boto3_patch.start() + + def tearDown(self): + self._boto3_patch.stop() + + def test_disabled_database_does_not_call_put_item(self): + db = _make_ddb(enabled=False) + db.start_run("run1", "wf1") + db._runs_table.put_item.assert_not_called() + + def test_disabled_methods_return_empty(self): + db = _make_ddb(enabled=False) + self.assertEqual(db.get_task_history("sig1"), []) + self.assertEqual(db.get_run_results("run1"), []) + self.assertIsNone(db.get_run_info("run1")) + self.assertEqual(db.get_runs_for_workflow("wf1"), []) + + def test_start_run_puts_item_with_correct_fields(self): + self.db.start_run("run1", "taskfile1", "/path/tasks.txt", 5) + self.db._runs_table.put_item.assert_called_once() + item = self.db._runs_table.put_item.call_args[1]["Item"] + self.assertEqual(item["run_id"], "run1") + self.assertEqual(item["workflow"], "taskfile1") + self.assertEqual(item["task_count"], 5) + + def test_complete_run_updates_item(self): + self.db.complete_run("run1", status="Success", success_count=3, failure_count=1) + self.db._runs_table.update_item.assert_called_once() + call_kwargs = self.db._runs_table.update_item.call_args[1] + self.assertEqual(call_kwargs["Key"], {"run_id": "run1"}) + self.assertEqual(call_kwargs["ExpressionAttributeValues"][":status"], "Success") + + def test_get_run_info_queries_by_run_id(self): + self.db._runs_table.get_item.return_value = {"Item": _MOCK_RUN_ITEM} + result = self.db.get_run_info("run1") + self.db._runs_table.get_item.assert_called_once_with(Key={"run_id": "run1"}) + self.assertEqual(result["run_id"], "run1") + + def test_get_run_info_returns_none_when_not_found(self): + self.db._runs_table.get_item.return_value = {} + result = self.db.get_run_info("missing") + self.assertIsNone(result) + + def test_get_run_results_returns_float_duration_and_int_retry(self): + self.db._task_results_table.query.return_value = {"Items": [_MOCK_TASK_ITEM]} + results = self.db.get_run_results("run1") + self.assertEqual(len(results), 1) + self.assertIsInstance(results[0]["duration_seconds"], float) + self.assertAlmostEqual(results[0]["duration_seconds"], 5.123, places=3) + self.assertIsInstance(results[0]["retry_count"], int) + self.assertEqual(results[0]["retry_count"], 2) + + def test_record_task_puts_item_with_correct_fields(self): + start = datetime(2024, 1, 1, 10, 0, 0) + end = start + timedelta(seconds=5) + self.db.record_task( + run_id="run1", + task_id="task1", + instance="inst1", + process="proc1", + parameters={"p": "v"}, + success=True, + start_time=start, + end_time=end, + ) + self.db._task_results_table.put_item.assert_called_once() + item = self.db._task_results_table.put_item.call_args[1]["Item"] + self.assertEqual(item["run_id"], "run1") + self.assertEqual(item["task_id"], "task1") + self.assertEqual(item["status"], "Success") + + def test_record_failed_task_sets_fail_status(self): + start = datetime(2024, 1, 1, 10, 0, 0) + end = start + timedelta(seconds=1) + self.db.record_task( + run_id="run1", + task_id="task1", + instance="inst1", + process="proc1", + parameters={}, + success=False, + start_time=start, + end_time=end, + error_message="Process failed", + ) + item = self.db._task_results_table.put_item.call_args[1]["Item"] + self.assertEqual(item["status"], "Fail") + self.assertEqual(item["error_message"], "Process failed")