From bb6f428231de9e8d4bf5424f745aa85f6b3783e5 Mon Sep 17 00:00:00 2001 From: Tomas Gonzalez Date: Thu, 19 Jun 2025 12:09:55 -0400 Subject: [PATCH] add links to workbench and refactor gate handler --- src/workbench_cli/handlers/evaluate_gates.py | 417 +++++--- src/workbench_cli/utilities/scan_workflows.py | 54 ++ tests/unit/utilities/test_scan_workflows.py | 895 ++++++++++++------ 3 files changed, 947 insertions(+), 419 deletions(-) diff --git a/src/workbench_cli/handlers/evaluate_gates.py b/src/workbench_cli/handlers/evaluate_gates.py index c77dbd5..aecf75d 100644 --- a/src/workbench_cli/handlers/evaluate_gates.py +++ b/src/workbench_cli/handlers/evaluate_gates.py @@ -2,7 +2,8 @@ import logging import argparse -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Dict, Any, Optional, Tuple +from dataclasses import dataclass from ..api import WorkbenchAPI from ..utilities.error_handling import handler_error_wrapper @@ -13,7 +14,7 @@ ProcessTimeoutError, ValidationError ) -from ..utilities.scan_workflows import wait_for_scan_completion +from ..utilities.scan_workflows import wait_for_scan_completion, get_workbench_links # Get logger from the handlers package from . import logger @@ -21,129 +22,213 @@ if TYPE_CHECKING: from ..api import WorkbenchAPI -@handler_error_wrapper -def handle_evaluate_gates(workbench: "WorkbenchAPI", params: "argparse.Namespace") -> bool: +# Constants +MAX_DISPLAY_FILES = 10 +SEVERITY_ORDER = ["critical", "high", "medium", "low"] + +@dataclass +class GateResult: + """Data class to represent the result of a gate check.""" + passed: bool + count: int + message: str + link_key: Optional[str] = None + +@dataclass +class GateResults: + """Container for all gate check results.""" + pending_files: GateResult + policy_warnings: GateResult + vulnerabilities: GateResult + + @property + def all_passed(self) -> bool: + return self.pending_files.passed and self.policy_warnings.passed and self.vulnerabilities.passed + +def _extract_policy_count(policy_data: Any) -> int: """ - Handler for the 'evaluate-gates' command. - Checks scan status and evaluates policy warnings. + Extract policy warning count from API response, handling different response formats. Args: - workbench: The Workbench API client - params: Command line parameters + policy_data: The API response data Returns: - bool: True if all gates passed, False if any gate failed - - Raises: - Various exceptions based on errors that occur during the process + int: The policy warning count """ - print(f"\n--- Running {params.command.upper()} Command ---") + if not isinstance(policy_data, dict): + logger.warning(f"Unexpected policy warnings data format: {policy_data}") + return 0 - # Resolve project and scan (find only) - print("\nResolving scan for gate evaluation...") - project_code = workbench.resolve_project(params.project_name, create_if_missing=False) - scan_code, scan_id = workbench.resolve_scan( - scan_name=params.scan_name, - project_name=params.project_name, - create_if_missing=False, - params=params - ) + # Handle nested data structure + if "data" in policy_data and isinstance(policy_data["data"], dict): + return policy_data["data"].get("policy_warnings_total", 0) - # Wait for scan and dependency analysis to complete - print("\nVerifying scan completion...") - scan_completed, da_completed, _ = wait_for_scan_completion(workbench, params, scan_code) + # Handle flat structure with fallback + return policy_data.get("policy_warnings_total", policy_data.get("total", 0)) + +def _display_pending_files(pending_files: Dict[str, str], count: int, show_files: bool) -> None: + """ + Display pending files information. - if not scan_completed: - print("\n❌ Gate Evaluation Failed: KB Scan has not completed successfully.") - return False + Args: + pending_files: Dictionary of file IDs to file paths + count: Total count of pending files + show_files: Whether to show individual file paths + """ + if not show_files or count == 0: + return - # Track gate pass/fail states - pending_files_gate_passed = True - policy_gate_passed = True - vuln_gate_passed = True + print("\nPending Files:") + file_items = list(pending_files.items()) - # Check for pending files - always check this + for i, (_, file_path) in enumerate(file_items): + if i >= MAX_DISPLAY_FILES: + break + print(f" {file_path}") + + if count > MAX_DISPLAY_FILES: + remaining = count - MAX_DISPLAY_FILES + print(f" ... and {remaining} more files (showing first {MAX_DISPLAY_FILES} of {count} total)") + +def _display_vulnerability_breakdown(vuln_counts: Dict[str, int]) -> None: + """ + Display vulnerability counts by severity. + + Args: + vuln_counts: Dictionary of severity levels to counts + """ + total_vulns = sum(vuln_counts.values()) + print(f"\n⚠️ Warning: Found {total_vulns} vulnerabilities. By CVSS Score:") + + for severity in SEVERITY_ORDER: + if vuln_counts[severity] > 0: + print(f" - {severity.upper()}: {vuln_counts[severity]}") + +def _check_pending_files_gate(workbench: "WorkbenchAPI", scan_code: str, params: "argparse.Namespace") -> GateResult: + """ + Check the pending files gate. + + Args: + workbench: The Workbench API client + scan_code: The scan identifier + params: Command line parameters + + Returns: + GateResult: The result of the pending files check + """ print("\nChecking for pending files...") - pending_count = 0 + pending_files = {} + count = 0 + try: pending_files = workbench.get_pending_files(scan_code) - pending_count = len(pending_files) + count = len(pending_files) except (ApiError, NetworkError) as e: print(f"\n⚠️ Warning: Failed to check for pending files: {e}") logger.warning(f"Error checking pending files for scan '{scan_code}': {e}") - # Only fail if pending files check is explicitly required + if params.fail_on_pending: - pending_files_gate_passed = False - print(f"\n❌ Gate Failed: Unable to verify pending files status due to API error") - pending_files = {} + return GateResult( + passed=False, + count=0, + message="❌ Gate Failed: Unable to verify pending files status due to API error" + ) - if pending_count > 0: + # Determine gate result + if count > 0: + _display_pending_files(pending_files, count, getattr(params, 'show_pending_files', False)) + if params.fail_on_pending: - print(f"\n❌ Gate Failed: Found {pending_count} pending files that require identification.") - pending_files_gate_passed = False + return GateResult( + passed=False, + count=count, + message=f"❌ Gate Failed: Found {count} pending files that require identification.", + link_key="pending" + ) else: - print(f"\n⚠️ Warning: Found {pending_count} pending files that require identification.") + print(f"\n⚠️ Warning: Found {count} pending files that require identification.") print("Note: Gate is not set to fail on pending files (--fail-on-pending not specified).") - - # Display pending files if requested - show_pending_files = getattr(params, 'show_pending_files', False) - if show_pending_files and pending_count > 0: - print("\nPending Files:") - # Limit display to first 25 files - file_items = list(pending_files.items()) - for i, (file_id, file_path) in enumerate(file_items): - if i >= 25: - break - print(f" {file_path}") - - # Show a message if there are more files than displayed - if pending_count > 25: - print(f" ... and {pending_count - 25} more files (showing first 25 of {pending_count} total)") + return GateResult( + passed=True, + count=count, + message=f"Found {count} pending files", + link_key="pending" + ) else: print("\n✅ No pending files found - all files have been identified.") + return GateResult(passed=True, count=0, message="No pending files found") + +def _check_policy_warnings_gate(workbench: "WorkbenchAPI", scan_code: str, params: "argparse.Namespace") -> GateResult: + """ + Check the policy warnings gate. + + Args: + workbench: The Workbench API client + scan_code: The scan identifier + params: Command line parameters + + Returns: + GateResult: The result of the policy warnings check + """ + print("\nChecking for license policy warnings...") - # Check for policy warnings - always check this - print("\nChecking for policy warnings...") - policy_data = None try: policy_data = workbench.get_policy_warnings_counter(scan_code) + count = _extract_policy_count(policy_data) - # Extract the count correctly based on the API response structure - # The API might return {data: {policy_warnings_total: N}} or just {policy_warnings_total: N} - if isinstance(policy_data, dict): - if "data" in policy_data and isinstance(policy_data["data"], dict): - policy_warning_count = policy_data["data"].get("policy_warnings_total", 0) - else: - policy_warning_count = policy_data.get("policy_warnings_total", - policy_data.get("total", 0)) # Fallback to 'total' for backward compatibility - else: - policy_warning_count = 0 - logger.warning(f"Unexpected policy warnings data format: {policy_data}") - - if policy_warning_count > 0: + if count > 0: if params.fail_on_policy: - print(f"\n❌ Gate Failed: Found {policy_warning_count} policy warnings.") - policy_gate_passed = False + return GateResult( + passed=False, + count=count, + message=f"❌ Gate Failed: Found {count} policy warnings.", + link_key="policy" + ) else: - print(f"\n⚠️ Warning: Found {policy_warning_count} policy warnings.") - print("Note: Gate is not set to fail on policy (--fail-on-policy not specified).") + print(f"\n⚠️ Warning: Found {count} license policy warnings.") + print("Note: Gate is not set to fail on license policy warnings (--fail-on-policy not specified).") + return GateResult( + passed=True, + count=count, + message=f"Found {count} policy warnings", + link_key="policy" + ) else: print("\n✅ No policy warnings found.") + return GateResult(passed=True, count=0, message="No policy warnings found") + except (ApiError, NetworkError) as e: print(f"\n⚠️ Warning: Failed to check for policy warnings: {e}") logger.warning(f"Error checking policy warnings for scan '{scan_code}': {e}") + if params.fail_on_policy: - policy_gate_passed = False - print(f"\n❌ Gate Failed: Unable to verify policy warnings status due to API error") + return GateResult( + passed=False, + count=0, + message="❌ Gate Failed: Unable to verify policy warnings status due to API error" + ) + else: + return GateResult(passed=True, count=0, message="Policy check failed") + +def _check_vulnerabilities_gate(workbench: "WorkbenchAPI", scan_code: str, params: "argparse.Namespace") -> GateResult: + """ + Check the vulnerabilities gate. - # Check for vulnerabilities - always check this + Args: + workbench: The Workbench API client + scan_code: The scan identifier + params: Command line parameters + + Returns: + GateResult: The result of the vulnerabilities check + """ print("\nChecking for vulnerabilities...") + try: vulnerabilities = workbench.list_vulnerabilities(scan_code) # Count vulnerabilities by severity vuln_counts = {"critical": 0, "high": 0, "medium": 0, "low": 0, "other": 0} - severities_order = ["critical", "high", "medium", "low"] for vuln in vulnerabilities: severity = vuln.get("severity", "").lower() @@ -155,78 +240,158 @@ def handle_evaluate_gates(workbench: "WorkbenchAPI", params: "argparse.Namespace total_vulns = sum(vuln_counts.values()) if total_vulns > 0: - # Check if we should fail based on severity threshold if params.fail_on_vuln_severity: - threshold_idx = severities_order.index(params.fail_on_vuln_severity) - has_threshold_vulns = False + threshold_idx = SEVERITY_ORDER.index(params.fail_on_vuln_severity) - for severity in severities_order[:threshold_idx + 1]: + for severity in SEVERITY_ORDER[:threshold_idx + 1]: if vuln_counts[severity] > 0: - has_threshold_vulns = True - print(f"\n❌ Gate Failed: Found vulnerabilities with severity {severity.upper()} (threshold: {params.fail_on_vuln_severity.upper()})") - vuln_gate_passed = False - break + return GateResult( + passed=False, + count=total_vulns, + message=f"❌ Gate Failed: Found vulnerabilities with severity {severity.upper()} (threshold: {params.fail_on_vuln_severity.upper()})", + link_key="vulnerabilities" + ) - if vuln_gate_passed: - print(f"\n✅ No vulnerabilities found with severity {params.fail_on_vuln_severity.upper()} or higher.") + print(f"\n✅ No vulnerabilities found with severity {params.fail_on_vuln_severity.upper()} or higher.") + return GateResult(passed=True, count=total_vulns, message=f"Found {total_vulns} vulnerabilities below threshold") else: - # If vulnerabilities exist but gate is not set to fail - print(f"\n⚠️ Warning: Found {total_vulns} vulnerabilities. By CVSS Score:") - for severity in severities_order: - if vuln_counts[severity] > 0: - print(f" - {severity.upper()}: {vuln_counts[severity]}") + _display_vulnerability_breakdown(vuln_counts) print("Note: Gate is not set to fail on vulnerabilities (--fail-on-vuln-severity not specified).") + return GateResult(passed=True, count=total_vulns, message=f"Found {total_vulns} vulnerabilities") else: print("\n✅ No vulnerabilities found.") + return GateResult(passed=True, count=0, message="No vulnerabilities found") + except (ApiError, NetworkError) as e: print(f"\n⚠️ Warning: Failed to check for vulnerabilities: {e}") logger.warning(f"Error checking vulnerabilities for scan '{scan_code}': {e}") + if params.fail_on_vuln_severity: - vuln_gate_passed = False - print(f"\n❌ Gate Failed: Unable to verify vulnerabilities status due to API error") + return GateResult( + passed=False, + count=0, + message="❌ Gate Failed: Unable to verify vulnerabilities status due to API error" + ) + else: + return GateResult(passed=True, count=0, message="Vulnerability check failed") + +def _display_workbench_links(workbench_links: Optional[Dict[str, Any]], results: GateResults) -> None: + """ + Display relevant Workbench links based on gate results. + + Args: + workbench_links: Dictionary of workbench links + results: The gate results containing link information + """ + if not workbench_links: + return - # Final gate evaluation summary + # Show specific links for failed or warning gates + for result in [results.pending_files, results.policy_warnings, results.vulnerabilities]: + if result.link_key and result.link_key in workbench_links and result.count > 0: + link_info = workbench_links[result.link_key] + print(f"\n🔗 {link_info['message']}: {link_info['url']}") + +def _print_gate_summary(params: "argparse.Namespace", results: GateResults) -> None: + """ + Print the final gate evaluation summary. + + Args: + params: Command line parameters + results: The gate results + """ print("\n" + "="*50) print("Gate Evaluation Summary:") print("="*50) - all_gates_passed = pending_files_gate_passed and policy_gate_passed and vuln_gate_passed - + # Pending files summary if params.fail_on_pending: - status = "✅ PASSED" if pending_files_gate_passed else "❌ FAILED" - print(f"Pending Files Gate: {status} ({pending_count} pending files)") + status = "✅ PASSED" if results.pending_files.passed else "❌ FAILED" + print(f"Pending Files Gate: {status} ({results.pending_files.count} pending files)") else: - print(f"Pending Files: {pending_count} files {'✅' if pending_count == 0 else '⚠️'}") + icon = "✅" if results.pending_files.count == 0 else "⚠️" + print(f"Pending Files: {results.pending_files.count} files {icon}") + # Policy warnings summary if params.fail_on_policy: - if policy_data and isinstance(policy_data, dict): - if 'data' in policy_data and isinstance(policy_data['data'], dict): - policy_count = policy_data['data'].get('policy_warnings_total', 0) - else: - policy_count = policy_data.get('policy_warnings_total', policy_data.get('total', 0)) - else: - policy_count = "Not Checked" - status = "✅ PASSED" if policy_gate_passed else "❌ FAILED" - print(f"Policy Warnings Gate: {status} ({policy_count} warnings)") + status = "✅ PASSED" if results.policy_warnings.passed else "❌ FAILED" + print(f"Policy Warnings Gate: {status} ({results.policy_warnings.count} warnings)") else: - policy_count = "Not Checked" - if policy_data and isinstance(policy_data, dict): - if 'data' in policy_data and isinstance(policy_data['data'], dict): - policy_count = policy_data['data'].get('policy_warnings_total', 0) - else: - policy_count = policy_data.get('policy_warnings_total', policy_data.get('total', 0)) - print(f"Policy Warnings: {policy_count} warnings {'✅' if policy_count == 0 or policy_count == 'Not Checked' else '⚠️'}") + icon = "✅" if results.policy_warnings.count == 0 else "⚠️" + print(f"Policy Warnings: {results.policy_warnings.count} warnings {icon}") + # Vulnerabilities summary if params.fail_on_vuln_severity: - status = "✅ PASSED" if vuln_gate_passed else "❌ FAILED" + status = "✅ PASSED" if results.vulnerabilities.passed else "❌ FAILED" print(f"Vulnerability Gate: {status} (Threshold: {params.fail_on_vuln_severity.upper()})") else: - total_vulns = sum(vuln_counts.values()) if 'vuln_counts' in locals() else "Not Checked" - print(f"Vulnerabilities: {total_vulns} {'✅' if total_vulns == 0 or total_vulns == 'Not Checked' else '⚠️'}") + icon = "✅" if results.vulnerabilities.count == 0 else "⚠️" + print(f"Vulnerabilities: {results.vulnerabilities.count} {icon}") print("="*50) - print(f"Overall Gate Status: {'✅ PASSED' if all_gates_passed else '❌ FAILED'}") + status = "✅ PASSED" if results.all_passed else "❌ FAILED" + print(f"Overall Gate Status: {status}") print("="*50) + +@handler_error_wrapper +def handle_evaluate_gates(workbench: "WorkbenchAPI", params: "argparse.Namespace") -> bool: + """ + Handler for the 'evaluate-gates' command. + Checks scan status and evaluates policy warnings. + + Args: + workbench: The Workbench API client + params: Command line parameters + + Returns: + bool: True if all gates passed, False if any gate failed + + Raises: + Various exceptions based on errors that occur during the process + """ + print(f"\n--- Running {params.command.upper()} Command ---") + + # Resolve project and scan (find only) + print("\nResolving scan for gate evaluation...") + project_code = workbench.resolve_project(params.project_name, create_if_missing=False) + scan_code, scan_id = workbench.resolve_scan( + scan_name=params.scan_name, + project_name=params.project_name, + create_if_missing=False, + params=params + ) + + # Wait for scan and dependency analysis to complete + print("\nVerifying scan completion...") + scan_completed, da_completed, _ = wait_for_scan_completion(workbench, params, scan_code) + + if not scan_completed: + print("\n❌ Gate Evaluation Failed: KB Scan has not completed successfully.") + return False + + # Generate all Workbench links once for use throughout the handler + workbench_links = None + try: + workbench_links = get_workbench_links(workbench.api_url, scan_id) + except Exception as e: + logger.debug(f"Failed to generate Workbench links: {e}") + + # Run all gate checks + results = GateResults( + pending_files=_check_pending_files_gate(workbench, scan_code, params), + policy_warnings=_check_policy_warnings_gate(workbench, scan_code, params), + vulnerabilities=_check_vulnerabilities_gate(workbench, scan_code, params) + ) + + # Display any relevant Workbench links + _display_workbench_links(workbench_links, results) + + # Print final summary + _print_gate_summary(params, results) + + # Show main scan link for users to review results + if workbench_links and "main" in workbench_links: + print(f"\n🔗 {workbench_links['main']['message']}: {workbench_links['main']['url']}") - return all_gates_passed + return results.all_passed diff --git a/src/workbench_cli/utilities/scan_workflows.py b/src/workbench_cli/utilities/scan_workflows.py index ee6920d..d1dbd62 100644 --- a/src/workbench_cli/utilities/scan_workflows.py +++ b/src/workbench_cli/utilities/scan_workflows.py @@ -18,6 +18,60 @@ logger = logging.getLogger("workbench-cli") +# --- Workbench UI Link Generation --- + +def get_workbench_links(api_url: str, scan_id: int) -> Dict[str, Dict[str, str]]: + """ + Get all Workbench UI links and messages for a scan. + + Args: + api_url: The Workbench API URL (includes /api.php) + scan_id: The scan ID + + Returns: + Dict with link types as keys, each containing 'url' and 'message' + Example: { + "main": {"url": "https://...", "message": "View scan results..."}, + "pending": {"url": "https://...", "message": "Review Pending IDs..."}, + "policy": {"url": "https://...", "message": "Review policy warnings..."} + } + """ + # Link type configuration + link_config = { + "main": { + "view_param": None, + "message": "View scan results in Workbench" + }, + "pending": { + "view_param": "pending_items", + "message": "Review Pending IDs in Workbench" + }, + "policy": { + "view_param": "mark_as_identified", + "message": "Review policy warnings in Workbench" + }, + } + + # Build base URL once + base_url = api_url.replace("/api.php", "").rstrip("/") + + # Build all links + links = {} + for link_type, config in link_config.items(): + url = f"{base_url}/index.html?form=main_interface&action=scanview&sid={scan_id}" + if config["view_param"]: + url += f"¤t_view={config['view_param']}" + + links[link_type] = { + "url": url, + "message": config["message"] + } + + return links + + + + # --- Process Waiters and Checkers --- def assert_scan_is_idle( diff --git a/tests/unit/utilities/test_scan_workflows.py b/tests/unit/utilities/test_scan_workflows.py index 3ac90f4..8c7657e 100644 --- a/tests/unit/utilities/test_scan_workflows.py +++ b/tests/unit/utilities/test_scan_workflows.py @@ -1,9 +1,14 @@ +""" +Test suite for scan_workflows.py utilities. + +This module contains comprehensive tests for all scan workflow utility functions +including link generation, scan status management, and result processing. +""" + import pytest import argparse -import time import json import os -import requests from unittest.mock import MagicMock, patch, mock_open, call from typing import Dict, Any @@ -17,353 +22,657 @@ fetch_display_save_results, format_duration, print_operation_summary, + get_workbench_links, ) from workbench_cli.exceptions import ( - WorkbenchCLIError, ApiError, NetworkError, - ConfigurationError, ProcessError, - ProcessTimeoutError, - FileSystemError, - ValidationError, - CompatibilityError, - ProjectNotFoundError, ScanNotFoundError, - ProjectExistsError, - ScanExistsError ) -# --- Fixtures --- +# ============================================================================ +# TEST CONSTANTS +# ============================================================================ + +# Common test data +TEST_SCAN_CODE = "TEST_SCAN_12345" +TEST_PROJECT_CODE = "TEST_PROJECT_67890" +TEST_SCAN_ID = 123456 +TEST_API_URL = "https://workbench.example.com/api.php" +TEST_BASE_URL = "https://workbench.example.com" + +# Sample test data +SAMPLE_PROJECT_DATA = { + "name": "test_project", + "code": TEST_PROJECT_CODE, + "project_name": "test_project", + "project_code": TEST_PROJECT_CODE +} + +SAMPLE_SCAN_DATA = { + "name": "test_scan", + "code": TEST_SCAN_CODE, + "id": str(TEST_SCAN_ID), + "project_code": TEST_PROJECT_CODE +} + +SAMPLE_VULNERABILITY_DATA = { + "cve": "CVE-2021-1234", + "severity": "HIGH", + "component_name": "test_component", + "component_version": "1.0.0" +} + +SAMPLE_LICENSE_DATA = { + "identifier": "MIT", + "name": "MIT License" +} + +SAMPLE_DEPENDENCY_DATA = { + "name": "test_dependency", + "version": "2.1.0", + "license_identifier": "Apache-2.0" +} + +# Duration test cases +DURATION_TEST_CASES = [ + (0, "0 seconds"), + (1, "1 second"), + (59, "59 seconds"), + (60, "1 minutes"), + (61, "1 minutes, 1 seconds"), + (119, "1 minutes, 59 seconds"), + (120, "2 minutes"), + (121, "2 minutes, 1 seconds"), + (3600, "60 minutes"), + (3661, "61 minutes, 1 seconds"), + (7322.5, "122 minutes, 2 seconds"), # Test rounding + (None, "N/A"), + ("invalid", "Invalid Duration"), +] + +# API URL variants for testing +API_URL_VARIANTS = [ + "https://example.com/api.php", + "https://example.com/api.php/", + "https://example.com/", + "https://example.com", + "http://localhost:8080/api.php", + "http://localhost:8080/fossid/api.php" +] + +# Expected link messages +EXPECTED_MESSAGES = { + "main": "View scan results in Workbench", + "pending": "Review Pending IDs in Workbench", + "policy": "Review policy warnings in Workbench" +} + +# ============================================================================ +# FIXTURES +# ============================================================================ + @pytest.fixture def mock_workbench(mocker): + """Create a comprehensive mock WorkbenchAPI instance.""" workbench = mocker.MagicMock() - workbench.list_projects.return_value = [ - {"name": "test_project", "code": "TEST_PROJECT", "project_name": "test_project", "project_code": "TEST_PROJECT"} - ] - workbench.get_project_scans.return_value = [ - {"name": "test_scan", "code": "TEST_SCAN", "id": "123", "project_code": "TEST_PROJECT"} - ] - workbench.list_scans.return_value = [ - {"name": "test_scan", "code": "TEST_SCAN", "id": "123"} - ] + + # Basic data returns + workbench.list_projects.return_value = [SAMPLE_PROJECT_DATA] + workbench.get_project_scans.return_value = [SAMPLE_SCAN_DATA] + workbench.list_scans.return_value = [SAMPLE_SCAN_DATA] + + # Status and process management workbench.assert_process_can_start = mocker.MagicMock(return_value=None) workbench.get_scan_status = mocker.MagicMock() workbench.check_status_download_content_from_git = mocker.MagicMock() workbench._is_status_check_supported = mocker.MagicMock() workbench._standard_scan_status_accessor = mocker.MagicMock() + + # Wait operations workbench.wait_for_git_clone = mocker.MagicMock() workbench.wait_for_archive_extraction = mocker.MagicMock() workbench.wait_for_scan_to_finish = mocker.MagicMock() + + # Data retrieval workbench.get_dependency_analysis_results = mocker.MagicMock() workbench.list_vulnerabilities = mocker.MagicMock() workbench.get_scan_identified_licenses = mocker.MagicMock() workbench.get_scan_identified_components = mocker.MagicMock() workbench.get_scan_folder_metrics = mocker.MagicMock() workbench.get_policy_warnings_counter = mocker.MagicMock() + return workbench + @pytest.fixture def mock_params(mocker): + """Create a mock argparse.Namespace with common default values.""" params = mocker.MagicMock(spec=argparse.Namespace) + + # Scan configuration params.scan_number_of_tries = 60 params.scan_wait_time = 5 - params.command = None - params.project_name = None - params.scan_name = None + params.command = "scan" + + # Project and scan identification + params.project_name = "test_project" + params.scan_name = "test_scan" + + # Git parameters params.git_url = None params.git_branch = None params.git_tag = None params.git_depth = None + + # Reuse settings params.id_reuse = False params.id_reuse_type = None params.id_reuse_source = None + + # Display flags - all False by default params.show_licenses = False params.show_components = False params.show_dependencies = False params.show_scan_metrics = False params.show_policy_warnings = False params.show_vulnerabilities = False + + # Output settings params.path_result = None + + # Analysis flags params.run_dependency_analysis = False params.dependency_analysis_only = False + return params -# --- Tests for format_duration (migrated from old test_utils.py) --- -@pytest.mark.parametrize("seconds, expected", [ - (0, "0 seconds"), - (1, "1 second"), - (59, "59 seconds"), - (60, "1 minutes"), - (61, "1 minutes, 1 seconds"), - (119, "1 minutes, 59 seconds"), - (120, "2 minutes"), - (121, "2 minutes, 1 seconds"), - (3600, "60 minutes"), - (3661, "61 minutes, 1 seconds"), - (7322.5, "122 minutes, 2 seconds"), # Test rounding - (None, "N/A"), - ("abc", "Invalid Duration"), -]) -def test_format_duration(seconds, expected): - assert format_duration(seconds) == expected - -# --- Tests for save_results_to_file (migrated from old test_utils.py) --- -@patch("builtins.open", new_callable=mock_open) -@patch("os.makedirs") -def test_save_results_to_file_success(mock_makedirs, mock_open_file): - filepath = "output/results.json" - results = {"key": "value"} - scan_code = "TEST_SCAN" - save_results_to_file(filepath, results, scan_code) - mock_makedirs.assert_called_once_with("output", exist_ok=True) - # Check file was opened for writing - mock_open_file.assert_any_call(filepath, 'w', encoding='utf-8') - # Join all write calls to get the full written content - handle = mock_open_file() - written = "".join(call_arg[0][0] for call_arg in handle.write.call_args_list) - assert json.loads(written) == results - -@patch("os.makedirs", side_effect=OSError("Cannot create dir")) -def test_save_results_to_file_makedirs_error(mock_makedirs): - filepath = "output/results.json" - results = {"key": "value"} - scan_code = "TEST_SCAN" - save_results_to_file(filepath, results, scan_code) - mock_makedirs.assert_called_once_with("output", exist_ok=True) - # No file open should be attempted if makedirs fails - -@patch("builtins.open", new_callable=mock_open) -@patch("os.makedirs") -def test_save_results_to_file_write_error(mock_makedirs, mock_open_file): - filepath = "output/results.json" - results = {"key": "value"} - scan_code = "TEST_SCAN" - # Simulate write error - handle = mock_open_file() - handle.write.side_effect = IOError("Cannot write file") - save_results_to_file(filepath, results, scan_code) - mock_makedirs.assert_called_once_with("output", exist_ok=True) - mock_open_file.assert_any_call(filepath, 'w', encoding='utf-8') - # The write will fail, but the function should handle/log the error - -# --- Tests for assert_scan_is_idle --- -def test_assert_scan_is_idle_all_idle(mock_workbench, mock_params): - """Test when all processes are already idle.""" - mock_workbench.check_status_download_content_from_git.return_value = "FINISHED" - mock_workbench.get_scan_status.return_value = {"status": "FINISHED"} - mock_workbench._standard_scan_status_accessor.return_value = "FINISHED" - - # Should not raise and should check statuses - assert_scan_is_idle(mock_workbench, "TEST_SCAN", mock_params, ["GIT_CLONE", "SCAN"]) - - mock_workbench.check_status_download_content_from_git.assert_called_with("TEST_SCAN") - mock_workbench.get_scan_status.assert_called_with("SCAN", "TEST_SCAN") - -def test_assert_scan_is_idle_scan_not_found(mock_workbench, mock_params): - """Test when scan is not found during idle check.""" - mock_workbench.check_status_download_content_from_git.side_effect = ScanNotFoundError("Not found") - - # Should not raise and should handle gracefully - assert_scan_is_idle(mock_workbench, "TEST_SCAN", mock_params, ["GIT_CLONE"]) - -def test_assert_scan_is_idle_api_error(mock_workbench, mock_params): - """Test API error during idle check.""" - mock_workbench.check_status_download_content_from_git.side_effect = ApiError("API Error") - - with pytest.raises(ProcessError, match="Failed to check status"): - assert_scan_is_idle(mock_workbench, "TEST_SCAN", mock_params, ["GIT_CLONE"]) - -def test_assert_scan_is_idle_extract_archives_not_supported(mock_workbench, mock_params): - """Test extract archives when status checking is not supported.""" - mock_workbench._is_status_check_supported.return_value = False - - # Should complete without errors - assert_scan_is_idle(mock_workbench, "TEST_SCAN", mock_params, ["EXTRACT_ARCHIVES"]) - - mock_workbench._is_status_check_supported.assert_called_with("TEST_SCAN", "EXTRACT_ARCHIVES") - -# --- Tests for wait_for_scan_completion --- -def test_wait_for_scan_completion_both_finished(mock_workbench, mock_params): - """Test when both KB scan and DA are already finished.""" - mock_workbench.get_scan_status.return_value = {"status": "FINISHED"} - mock_workbench._standard_scan_status_accessor.return_value = "FINISHED" - - scan_completed, da_completed, durations = wait_for_scan_completion(mock_workbench, mock_params, "TEST_SCAN") - - assert scan_completed is True - assert da_completed is True - assert "kb_scan" in durations - assert "dependency_analysis" in durations - -def test_wait_for_scan_completion_kb_scan_failed(mock_workbench, mock_params): - """Test when KB scan has failed.""" - mock_workbench.get_scan_status.return_value = {"status": "FAILED"} - mock_workbench._standard_scan_status_accessor.return_value = "FAILED" - - scan_completed, da_completed, durations = wait_for_scan_completion(mock_workbench, mock_params, "TEST_SCAN") - - assert scan_completed is False - assert da_completed is False - -def test_wait_for_scan_completion_da_new(mock_workbench, mock_params): - """Test when DA has not been run (status = NEW).""" - mock_workbench.get_scan_status.side_effect = [ - {"status": "FINISHED"}, # KB scan - {"status": "NEW"} # DA - ] - mock_workbench._standard_scan_status_accessor.side_effect = ["FINISHED", "NEW"] - - scan_completed, da_completed, durations = wait_for_scan_completion(mock_workbench, mock_params, "TEST_SCAN") - - assert scan_completed is True - assert da_completed is False - -# --- Tests for determine_scans_to_run --- -def test_determine_scans_to_run_default(mock_params): - """Test default behavior - only KB scan.""" - mock_params.run_dependency_analysis = False - mock_params.dependency_analysis_only = False - - result = determine_scans_to_run(mock_params) - - assert result == {"run_kb_scan": True, "run_dependency_analysis": False} -def test_determine_scans_to_run_with_da(mock_params): - """Test with dependency analysis enabled.""" - mock_params.run_dependency_analysis = True - mock_params.dependency_analysis_only = False - - result = determine_scans_to_run(mock_params) - - assert result == {"run_kb_scan": True, "run_dependency_analysis": True} +@pytest.fixture +def sample_results_data(): + """Provide sample results data for testing.""" + return { + "dependency_analysis": [SAMPLE_DEPENDENCY_DATA], + "vulnerabilities": [SAMPLE_VULNERABILITY_DATA], + "kb_licenses": [SAMPLE_LICENSE_DATA] + } -def test_determine_scans_to_run_da_only(mock_params): - """Test with dependency analysis only.""" - mock_params.run_dependency_analysis = False - mock_params.dependency_analysis_only = True - - result = determine_scans_to_run(mock_params) - - assert result == {"run_kb_scan": False, "run_dependency_analysis": True} +# ============================================================================ +# HELPER FUNCTIONS +# ============================================================================ -def test_determine_scans_to_run_both_flags(mock_params): - """Test with both DA flags - should use DA only.""" - mock_params.run_dependency_analysis = True - mock_params.dependency_analysis_only = True - - result = determine_scans_to_run(mock_params) - - assert result == {"run_kb_scan": False, "run_dependency_analysis": True} +def create_mock_status_response(status: str) -> Dict[str, str]: + """Create a mock scan status response.""" + return {"status": status} -# --- Tests for fetch_results --- -def test_fetch_results_no_flags(mock_workbench, mock_params): - """Test when no result flags are set.""" - result = fetch_results(mock_workbench, mock_params, "TEST_SCAN") - - assert result == {} -def test_fetch_results_licenses(mock_workbench, mock_params): - """Test fetching license results.""" - mock_params.show_licenses = True - mock_workbench.get_dependency_analysis_results.return_value = {"licenses": ["MIT", "GPL"]} - - result = fetch_results(mock_workbench, mock_params, "TEST_SCAN") - - assert "dependency_analysis" in result - mock_workbench.get_dependency_analysis_results.assert_called_once_with("TEST_SCAN") +def assert_url_structure(url: str, scan_id: int, view_param: str = None): + """Assert that a URL has the correct Workbench structure.""" + assert "index.html" in url + assert "form=main_interface" in url + assert "action=scanview" in url + assert f"sid={scan_id}" in url + + if view_param: + assert f"current_view={view_param}" in url + + # Should not contain /api.php + assert "/api.php" not in url -def test_fetch_results_vulnerabilities(mock_workbench, mock_params): - """Test fetching vulnerability results.""" - mock_params.show_vulnerabilities = True - mock_workbench.list_vulnerabilities.return_value = [{"cve": "CVE-2021-1234"}] - - result = fetch_results(mock_workbench, mock_params, "TEST_SCAN") - - assert "vulnerabilities" in result - mock_workbench.list_vulnerabilities.assert_called_once_with("TEST_SCAN") -def test_fetch_results_api_error(mock_workbench, mock_params): - """Test handling API errors during result fetching.""" - mock_params.show_licenses = True - mock_workbench.get_dependency_analysis_results.side_effect = ApiError("API Error") - mock_workbench.get_scan_identified_licenses.return_value = [{"identifier": "MIT", "name": "MIT License"}] - - # Should not raise, should return partial results - result = fetch_results(mock_workbench, mock_params, "TEST_SCAN") - - # Should return kb_licenses since that call succeeded - assert "kb_licenses" in result +def assert_link_data_structure(link_data: Dict[str, str]): + """Assert that link data has the correct structure.""" + assert isinstance(link_data, dict) + assert len(link_data) == 2 + assert set(link_data.keys()) == {"url", "message"} + + # Values should be non-empty strings + assert isinstance(link_data["url"], str) + assert isinstance(link_data["message"], str) + assert len(link_data["url"]) > 0 + assert len(link_data["message"]) > 0 -# --- Tests for display_results --- -def test_display_results_empty(mock_params): - """Test displaying empty results.""" - result = display_results({}, mock_params) - assert result is False # No results to display +# ============================================================================ +# DURATION FORMATTING TESTS +# ============================================================================ -def test_display_results_with_data(mock_params): - """Test displaying results with data.""" - # Need to set the appropriate flags for the data to be displayed - mock_params.show_dependencies = True - mock_params.show_vulnerabilities = True - results = { - "dependency_analysis": [{"name": "test", "version": "1.0", "license_identifier": "MIT"}], - "vulnerabilities": [{"cve": "CVE-2021-1234", "severity": "HIGH", "component_name": "test", "component_version": "1.0"}] - } - - result = display_results(results, mock_params) - assert result is True - -# --- Tests for fetch_display_save_results --- -@patch('workbench_cli.utilities.scan_workflows.fetch_results') -@patch('workbench_cli.utilities.scan_workflows.display_results') -@patch('workbench_cli.utilities.scan_workflows.save_results_to_file') -def test_fetch_display_save_results_complete(mock_save, mock_display, mock_fetch, mock_workbench, mock_params): - """Test complete fetch, display, and save workflow.""" - mock_params.path_result = "output.json" - mock_params.show_licenses = True # Need at least one flag set for display - mock_fetch.return_value = {"test": "data"} - mock_display.return_value = True - - fetch_display_save_results(mock_workbench, mock_params, "TEST_SCAN") - - mock_fetch.assert_called_once_with(mock_workbench, mock_params, "TEST_SCAN") - mock_display.assert_called_once_with({"test": "data"}, mock_params) - mock_save.assert_called_once_with("output.json", {"test": "data"}, "TEST_SCAN") - -@patch('workbench_cli.utilities.scan_workflows.fetch_results') -@patch('workbench_cli.utilities.scan_workflows.display_results') -def test_fetch_display_save_results_no_save(mock_display, mock_fetch, mock_workbench, mock_params): - """Test fetch and display without saving.""" - mock_params.path_result = None - mock_params.show_licenses = True # Need at least one flag set for display - mock_fetch.return_value = {"test": "data"} - mock_display.return_value = True - - fetch_display_save_results(mock_workbench, mock_params, "TEST_SCAN") - - mock_fetch.assert_called_once_with(mock_workbench, mock_params, "TEST_SCAN") - mock_display.assert_called_once_with({"test": "data"}, mock_params) - -# --- Tests for print_operation_summary --- -def test_print_operation_summary_basic(mock_params): - """Test basic operation summary.""" - mock_params.command = "scan" - - print_operation_summary(mock_params, True, "PROJ_CODE", "SCAN_CODE") - # Should complete without errors - -def test_print_operation_summary_with_durations(mock_params): - """Test operation summary with timing information.""" - mock_params.command = "scan" - durations = {"kb_scan": 120.5, "dependency_analysis": 60.0} - - print_operation_summary(mock_params, True, "PROJ_CODE", "SCAN_CODE", durations) - # Should complete without errors - -def test_print_operation_summary_da_failed(mock_params): - """Test operation summary when DA failed.""" - mock_params.command = "scan" - - print_operation_summary(mock_params, False, "PROJ_CODE", "SCAN_CODE") - # Should complete without errors \ No newline at end of file +class TestFormatDuration: + """Test cases for the format_duration function.""" + + @pytest.mark.parametrize("seconds, expected", DURATION_TEST_CASES) + def test_format_duration_variations(self, seconds, expected): + """Test format_duration with various input types and values.""" + assert format_duration(seconds) == expected + + def test_format_duration_edge_cases(self): + """Test format_duration with edge cases.""" + # Test very large numbers + assert format_duration(86400) == "1440 minutes" # 24 hours + + # Test zero and negative (though negative shouldn't happen in practice) + assert format_duration(0) == "0 seconds" + +# ============================================================================ +# FILE OPERATIONS TESTS +# ============================================================================ + +class TestSaveResultsToFile: + """Test cases for the save_results_to_file function.""" + + @patch("builtins.open", new_callable=mock_open) + @patch("os.makedirs") + def test_save_success(self, mock_makedirs, mock_open_file): + """Test successful file saving.""" + filepath = "output/results.json" + results = {"scan_id": TEST_SCAN_ID, "status": "completed"} + + save_results_to_file(filepath, results, TEST_SCAN_CODE) + + mock_makedirs.assert_called_once_with("output", exist_ok=True) + mock_open_file.assert_any_call(filepath, 'w', encoding='utf-8') + + # Verify JSON content + handle = mock_open_file() + written = "".join(call_arg[0][0] for call_arg in handle.write.call_args_list) + assert json.loads(written) == results + + @patch("os.makedirs", side_effect=OSError("Permission denied")) + def test_save_makedirs_failure(self, mock_makedirs): + """Test handling of directory creation failure.""" + filepath = "restricted/results.json" + results = {"test": "data"} + + # Should not raise exception + save_results_to_file(filepath, results, TEST_SCAN_CODE) + mock_makedirs.assert_called_once_with("restricted", exist_ok=True) + + @patch("builtins.open", new_callable=mock_open) + @patch("os.makedirs") + def test_save_write_failure(self, mock_makedirs, mock_open_file): + """Test handling of file write failure.""" + filepath = "output/results.json" + results = {"test": "data"} + + # Simulate write error + handle = mock_open_file() + handle.write.side_effect = IOError("Disk full") + + # Should not raise exception + save_results_to_file(filepath, results, TEST_SCAN_CODE) + mock_makedirs.assert_called_once_with("output", exist_ok=True) + +# ============================================================================ +# SCAN STATUS MANAGEMENT TESTS +# ============================================================================ + +class TestAssertScanIsIdle: + """Test cases for the assert_scan_is_idle function.""" + + def test_all_processes_idle(self, mock_workbench, mock_params): + """Test when all requested processes are already idle.""" + mock_workbench.check_status_download_content_from_git.return_value = "FINISHED" + mock_workbench.get_scan_status.return_value = create_mock_status_response("FINISHED") + mock_workbench._standard_scan_status_accessor.return_value = "FINISHED" + + # Should complete without raising + assert_scan_is_idle(mock_workbench, TEST_SCAN_CODE, mock_params, ["GIT_CLONE", "SCAN"]) + + # Verify correct API calls + mock_workbench.check_status_download_content_from_git.assert_called_with(TEST_SCAN_CODE) + mock_workbench.get_scan_status.assert_called_with("SCAN", TEST_SCAN_CODE) + + def test_scan_not_found_graceful_handling(self, mock_workbench, mock_params): + """Test graceful handling when scan is not found.""" + mock_workbench.check_status_download_content_from_git.side_effect = ScanNotFoundError("Scan not found") + + # Should not raise exception + assert_scan_is_idle(mock_workbench, TEST_SCAN_CODE, mock_params, ["GIT_CLONE"]) + + def test_api_error_propagation(self, mock_workbench, mock_params): + """Test that API errors are properly propagated.""" + mock_workbench.check_status_download_content_from_git.side_effect = ApiError("API temporarily unavailable") + + with pytest.raises(ProcessError, match="Failed to check status"): + assert_scan_is_idle(mock_workbench, TEST_SCAN_CODE, mock_params, ["GIT_CLONE"]) + + def test_extract_archives_not_supported(self, mock_workbench, mock_params): + """Test handling when EXTRACT_ARCHIVES status checking is not supported.""" + mock_workbench._is_status_check_supported.return_value = False + + # Should complete without errors + assert_scan_is_idle(mock_workbench, TEST_SCAN_CODE, mock_params, ["EXTRACT_ARCHIVES"]) + mock_workbench._is_status_check_supported.assert_called_with(TEST_SCAN_CODE, "EXTRACT_ARCHIVES") + + +class TestWaitForScanCompletion: + """Test cases for the wait_for_scan_completion function.""" + + def test_both_scans_already_finished(self, mock_workbench, mock_params): + """Test when both KB scan and DA are already finished.""" + mock_workbench.get_scan_status.return_value = create_mock_status_response("FINISHED") + mock_workbench._standard_scan_status_accessor.return_value = "FINISHED" + + scan_completed, da_completed, durations = wait_for_scan_completion(mock_workbench, mock_params, TEST_SCAN_CODE) + + assert scan_completed is True + assert da_completed is True + assert "kb_scan" in durations + assert "dependency_analysis" in durations + + def test_kb_scan_failed(self, mock_workbench, mock_params): + """Test when KB scan has failed.""" + mock_workbench.get_scan_status.return_value = create_mock_status_response("FAILED") + mock_workbench._standard_scan_status_accessor.return_value = "FAILED" + + scan_completed, da_completed, durations = wait_for_scan_completion(mock_workbench, mock_params, TEST_SCAN_CODE) + + assert scan_completed is False + assert da_completed is False + + def test_dependency_analysis_not_run(self, mock_workbench, mock_params): + """Test when DA has not been run (status = NEW).""" + mock_workbench.get_scan_status.side_effect = [ + create_mock_status_response("FINISHED"), # KB scan + create_mock_status_response("NEW") # DA + ] + mock_workbench._standard_scan_status_accessor.side_effect = ["FINISHED", "NEW"] + + scan_completed, da_completed, durations = wait_for_scan_completion(mock_workbench, mock_params, TEST_SCAN_CODE) + + assert scan_completed is True + assert da_completed is False + +# ============================================================================ +# SCAN CONFIGURATION TESTS +# ============================================================================ + +class TestDetermineScansToRun: + """Test cases for the determine_scans_to_run function.""" + + def test_default_configuration(self, mock_params): + """Test default behavior - only KB scan.""" + mock_params.run_dependency_analysis = False + mock_params.dependency_analysis_only = False + + result = determine_scans_to_run(mock_params) + + assert result == {"run_kb_scan": True, "run_dependency_analysis": False} + + def test_with_dependency_analysis(self, mock_params): + """Test with dependency analysis enabled.""" + mock_params.run_dependency_analysis = True + mock_params.dependency_analysis_only = False + + result = determine_scans_to_run(mock_params) + + assert result == {"run_kb_scan": True, "run_dependency_analysis": True} + + def test_dependency_analysis_only(self, mock_params): + """Test with dependency analysis only.""" + mock_params.run_dependency_analysis = False + mock_params.dependency_analysis_only = True + + result = determine_scans_to_run(mock_params) + + assert result == {"run_kb_scan": False, "run_dependency_analysis": True} + + def test_conflicting_flags_resolved(self, mock_params): + """Test that conflicting flags are resolved (DA only takes precedence).""" + mock_params.run_dependency_analysis = True + mock_params.dependency_analysis_only = True + + result = determine_scans_to_run(mock_params) + + assert result == {"run_kb_scan": False, "run_dependency_analysis": True} + +# ============================================================================ +# RESULTS PROCESSING TESTS +# ============================================================================ + +class TestFetchResults: + """Test cases for the fetch_results function.""" + + def test_no_flags_set(self, mock_workbench, mock_params): + """Test when no result flags are set.""" + result = fetch_results(mock_workbench, mock_params, TEST_SCAN_CODE) + assert result == {} + + def test_fetch_license_results(self, mock_workbench, mock_params): + """Test fetching license results.""" + mock_params.show_licenses = True + mock_workbench.get_dependency_analysis_results.return_value = [SAMPLE_DEPENDENCY_DATA] + + result = fetch_results(mock_workbench, mock_params, TEST_SCAN_CODE) + + assert "dependency_analysis" in result + mock_workbench.get_dependency_analysis_results.assert_called_once_with(TEST_SCAN_CODE) + + def test_fetch_vulnerabilities(self, mock_workbench, mock_params): + """Test fetching vulnerability results.""" + mock_params.show_vulnerabilities = True + mock_workbench.list_vulnerabilities.return_value = [SAMPLE_VULNERABILITY_DATA] + + result = fetch_results(mock_workbench, mock_params, TEST_SCAN_CODE) + + assert "vulnerabilities" in result + mock_workbench.list_vulnerabilities.assert_called_once_with(TEST_SCAN_CODE) + + def test_api_error_handling(self, mock_workbench, mock_params): + """Test graceful handling of API errors during result fetching.""" + mock_params.show_licenses = True + mock_workbench.get_dependency_analysis_results.side_effect = ApiError("Service unavailable") + mock_workbench.get_scan_identified_licenses.return_value = [SAMPLE_LICENSE_DATA] + + # Should not raise, should return partial results + result = fetch_results(mock_workbench, mock_params, TEST_SCAN_CODE) + + # Should return kb_licenses since that call succeeded + assert "kb_licenses" in result + + +class TestDisplayResults: + """Test cases for the display_results function.""" + + def test_empty_results(self, mock_params): + """Test displaying empty results.""" + result = display_results({}, mock_params) + assert result is False # No results to display + + def test_display_with_data(self, mock_params, sample_results_data): + """Test displaying results with actual data.""" + mock_params.show_dependencies = True + mock_params.show_vulnerabilities = True + + result = display_results(sample_results_data, mock_params) + assert result is True + + +class TestFetchDisplaySaveResults: + """Test cases for the fetch_display_save_results orchestration function.""" + + @patch('workbench_cli.utilities.scan_workflows.fetch_results') + @patch('workbench_cli.utilities.scan_workflows.display_results') + @patch('workbench_cli.utilities.scan_workflows.save_results_to_file') + def test_complete_workflow(self, mock_save, mock_display, mock_fetch, mock_workbench, mock_params): + """Test complete fetch, display, and save workflow.""" + mock_params.path_result = "output.json" + mock_params.show_licenses = True + mock_fetch.return_value = {"test": "data"} + mock_display.return_value = True + + fetch_display_save_results(mock_workbench, mock_params, TEST_SCAN_CODE) + + mock_fetch.assert_called_once_with(mock_workbench, mock_params, TEST_SCAN_CODE) + mock_display.assert_called_once_with({"test": "data"}, mock_params) + mock_save.assert_called_once_with("output.json", {"test": "data"}, TEST_SCAN_CODE) + + @patch('workbench_cli.utilities.scan_workflows.fetch_results') + @patch('workbench_cli.utilities.scan_workflows.display_results') + def test_no_save_specified(self, mock_display, mock_fetch, mock_workbench, mock_params): + """Test fetch and display without saving.""" + mock_params.path_result = None + mock_params.show_licenses = True + mock_fetch.return_value = {"test": "data"} + mock_display.return_value = True + + fetch_display_save_results(mock_workbench, mock_params, TEST_SCAN_CODE) + + mock_fetch.assert_called_once_with(mock_workbench, mock_params, TEST_SCAN_CODE) + mock_display.assert_called_once_with({"test": "data"}, mock_params) + +# ============================================================================ +# OPERATION SUMMARY TESTS +# ============================================================================ + +class TestPrintOperationSummary: + """Test cases for the print_operation_summary function.""" + + def test_basic_summary(self, mock_params): + """Test basic operation summary.""" + mock_params.command = "scan" + + # Should complete without errors + print_operation_summary(mock_params, True, TEST_PROJECT_CODE, TEST_SCAN_CODE) + + def test_summary_with_durations(self, mock_params): + """Test operation summary with timing information.""" + mock_params.command = "scan" + durations = {"kb_scan": 120.5, "dependency_analysis": 60.0} + + # Should complete without errors + print_operation_summary(mock_params, True, TEST_PROJECT_CODE, TEST_SCAN_CODE, durations) + + def test_summary_when_da_failed(self, mock_params): + """Test operation summary when dependency analysis failed.""" + mock_params.command = "scan" + + # Should complete without errors + print_operation_summary(mock_params, False, TEST_PROJECT_CODE, TEST_SCAN_CODE) + +# ============================================================================ +# WORKBENCH LINKS TESTS +# ============================================================================ + +class TestGetWorkbenchLinks: + """Comprehensive test cases for the get_workbench_links function.""" + + def test_basic_link_generation(self): + """Test basic link generation with standard API URL.""" + links = get_workbench_links(TEST_API_URL, TEST_SCAN_ID) + + # Should return all expected link types + assert set(links.keys()) == {"main", "pending", "policy"} + + # Each link should have correct structure + for link_type, link_data in links.items(): + assert_link_data_structure(link_data) + + def test_url_structure_correctness(self): + """Test that generated URLs have correct structure.""" + links = get_workbench_links(TEST_API_URL, TEST_SCAN_ID) + + # Test main link (no current_view parameter) + main_url = links["main"]["url"] + expected_main = f"{TEST_BASE_URL}/index.html?form=main_interface&action=scanview&sid={TEST_SCAN_ID}" + assert main_url == expected_main + + # Test pending link (with current_view=pending_items) + pending_url = links["pending"]["url"] + expected_pending = f"{expected_main}¤t_view=pending_items" + assert pending_url == expected_pending + + # Test policy link (with current_view=mark_as_identified) + policy_url = links["policy"]["url"] + expected_policy = f"{expected_main}¤t_view=mark_as_identified" + assert policy_url == expected_policy + + def test_message_correctness(self): + """Test that generated messages match expectations.""" + links = get_workbench_links(TEST_API_URL, TEST_SCAN_ID) + + for link_type, expected_message in EXPECTED_MESSAGES.items(): + assert links[link_type]["message"] == expected_message + + @pytest.mark.parametrize("api_url", API_URL_VARIANTS) + def test_api_url_variants(self, api_url): + """Test that function handles various API URL formats correctly.""" + links = get_workbench_links(api_url, TEST_SCAN_ID) + + # All URLs should be properly formatted regardless of input + for link_type, link_data in links.items(): + url = link_data["url"] + assert_url_structure(url, TEST_SCAN_ID) + + def test_scan_id_type_handling(self): + """Test that function handles different scan_id types.""" + # Test with integer + links_int = get_workbench_links(TEST_API_URL, 123) + assert "sid=123" in links_int["main"]["url"] + + # Test with string + links_str = get_workbench_links(TEST_API_URL, "456") + assert "sid=456" in links_str["main"]["url"] + + def test_result_consistency(self): + """Test that multiple calls return consistent results.""" + links1 = get_workbench_links(TEST_API_URL, TEST_SCAN_ID) + links2 = get_workbench_links(TEST_API_URL, TEST_SCAN_ID) + + assert links1 == links2 + + def test_base_url_stripping_variations(self): + """Test that /api.php is properly stripped from various URL formats.""" + test_cases = [ + ("https://example.com/api.php", "https://example.com"), + ("https://example.com/api.php/", "https://example.com"), + ("https://example.com/fossid/api.php", "https://example.com/fossid"), + ("https://example.com/path/to/api.php", "https://example.com/path/to"), + ] + + for input_url, expected_base in test_cases: + links = get_workbench_links(input_url, TEST_SCAN_ID) + main_url = links["main"]["url"] + assert main_url.startswith(f"{expected_base}/index.html") + + def test_required_url_elements_present(self): + """Test that all links contain required URL elements.""" + links = get_workbench_links(TEST_API_URL, TEST_SCAN_ID) + + required_params = [ + "form=main_interface", + "action=scanview", + f"sid={TEST_SCAN_ID}" + ] + + # All links should contain these base parameters + for link_type, link_data in links.items(): + url = link_data["url"] + for param in required_params: + assert param in url, f"Missing '{param}' in {link_type} URL: {url}" + + def test_view_parameters_correctness(self): + """Test that view parameters are correctly added to URLs.""" + links = get_workbench_links(TEST_API_URL, TEST_SCAN_ID) + + # Main link should NOT have current_view parameter + assert "current_view" not in links["main"]["url"] + + # Pending link should have current_view=pending_items + assert "current_view=pending_items" in links["pending"]["url"] + + # Policy link should have current_view=mark_as_identified + assert "current_view=mark_as_identified" in links["policy"]["url"] + + def test_data_structure_compliance(self): + """Test the exact structure of returned data.""" + links = get_workbench_links(TEST_API_URL, TEST_SCAN_ID) + + # Should be a dictionary with exactly 3 keys + assert isinstance(links, dict) + assert len(links) == 3 + assert set(links.keys()) == {"main", "pending", "policy"} + + # Each value should be a dict with exactly 2 keys + for link_type, link_data in links.items(): + assert_link_data_structure(link_data) \ No newline at end of file