Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ results.json

# Optional: Test data if not intended to be part of the repo
da-analyzer-results
sbom-test-data

# Optional: User-specific test scripts if not shared
test-commands.txt
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ requires-python = ">=3.9"
dependencies = [
"requests>=2.20.0",
"GitPython>=3.1.40",
"spdx-tools>=0.8.0",
"cyclonedx-python-lib[validation]>=7.0.0",
]

[project.scripts]
Expand Down
6 changes: 5 additions & 1 deletion src/workbench_cli/api/helpers/api_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ def _send_request(self, payload: dict, timeout: int = 1800) -> dict:
except requests.exceptions.Timeout as e:
logger.error("API request timed out: %s", e, exc_info=True)
raise NetworkError("Request to API server timed out", details={"error": str(e)})
except requests.exceptions.RequestException as e:
# Handle network-level errors (e.g., DNS failure, refused connection)
raise NetworkError(f"Network error while calling API: {e}") from e

except requests.exceptions.RequestException as e:
logger.error("API request failed: %s", e, exc_info=True)
raise NetworkError(f"API request failed: {str(e)}", details={"error": str(e)})
raise NetworkError(f"API request failed: {str(e)}", details={"error": str(e)})
3 changes: 3 additions & 0 deletions src/workbench_cli/api/helpers/process_waiters.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,9 @@ def wait_for_scan_to_finish(
elif scan_type == "DEPENDENCY_ANALYSIS":
operation_name = "Dependency Analysis"
should_track_files = False
elif scan_type == "REPORT_IMPORT":
operation_name = "SBOM Import"
should_track_files = False
else:
raise ValueError(f"Unsupported scan type: {scan_type}")

Expand Down
10 changes: 8 additions & 2 deletions src/workbench_cli/api/helpers/project_scan_resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def resolve_project(self, project_name: str, create_if_missing: bool = False) ->

raise ProjectNotFoundError(f"Project '{project_name}' not found")

def resolve_scan(self, scan_name: str, project_name: Optional[str], create_if_missing: bool, params: argparse.Namespace) -> Tuple[str, int]:
def resolve_scan(self, scan_name: str, project_name: Optional[str], create_if_missing: bool, params: argparse.Namespace, import_from_report: bool = False) -> Tuple[str, int]:
"""Find a scan by name, optionally creating it if not found."""
if project_name:
# Look in specific project
Expand All @@ -64,7 +64,13 @@ def resolve_scan(self, scan_name: str, project_name: Optional[str], create_if_mi
# Create if requested
if create_if_missing:
print(f"Creating scan '{scan_name}' in project '{project_name}'...")
self.create_webapp_scan(project_code=project_code, scan_name=scan_name, **self._get_git_params(params))
git_params = self._get_git_params(params)
self.create_webapp_scan(
project_code=project_code,
scan_name=scan_name,
import_from_report=import_from_report,
**git_params
)
time.sleep(2) # Brief wait for creation to process

# Get the newly created scan
Expand Down
164 changes: 91 additions & 73 deletions src/workbench_cli/api/helpers/scan_status_checkers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import requests
from typing import Dict, Any
import argparse
from typing import Dict, Any, List
from ...exceptions import (
ApiError,
NetworkError,
Expand Down Expand Up @@ -133,79 +134,7 @@ def _standard_scan_status_accessor(self, data: Dict[str, Any]) -> str:
logger.warning(f"Error accessing status keys in data: {data}", exc_info=True)
return "ACCESS_ERROR" # Use the ACCESS_ERROR state

def ensure_process_can_start(
self,
process_type: str,
scan_code: str,
wait_max_tries: int,
wait_interval: int
):
"""
Checks if a SCAN or DEPENDENCY_ANALYSIS can be started.
If the process is currently QUEUED or RUNNING, it waits for it to finish.

Args:
process_type: Type of process to check (SCAN or DEPENDENCY_ANALYSIS)
scan_code: Code of the scan to check
wait_max_tries: Max attempts to wait if process is running/queued.
wait_interval: Seconds between wait attempts.

Raises:
CompatibilityError: If the process cannot be started due to incompatible state
ProcessError: If there are process-related issues
ApiError: If there are API issues
NetworkError: If there are network issues
ScanNotFoundError: If the scan doesn't exist
"""
process_type_upper = process_type.upper()
if process_type_upper not in ["SCAN", "DEPENDENCY_ANALYSIS"]:
raise ValueError(f"Invalid process_type '{process_type}' provided to ensure_process_can_start.")

try:
scan_status = self.get_scan_status(process_type, scan_code)
# Use the standard accessor for consistent status checking
current_status = self._standard_scan_status_accessor(scan_status)

# If queued or running, wait for it to finish first
if current_status in ["QUEUED", "RUNNING"]:
print() # Newline before waiting message
print(f"Existing {process_type} for '{scan_code}' is {current_status}. Waiting for it to complete...")
logger.info(f"Existing {process_type} for '{scan_code}' is {current_status}. Waiting...")
try:
self.wait_for_scan_to_finish(process_type, scan_code, wait_max_tries, wait_interval)
print(f"Previous {process_type} for '{scan_code}' finished. Proceeding...")
logger.info(f"Previous {process_type} for '{scan_code}' finished.")
# No need to re-check status, wait_for_scan handles terminal states
return # Allow proceeding
except (ProcessTimeoutError, ProcessError) as wait_err:
# If waiting failed, we cannot start the new process
raise ProcessError(f"Could not start {process_type} for '{scan_code}' because waiting for the existing process failed: {wait_err}", details=getattr(wait_err, 'details', None)) from wait_err

# Allow starting if NEW, FINISHED, FAILED, or CANCELLED
allowed_statuses = ["NEW", "FINISHED", "FAILED", "CANCELLED"]
if current_status not in allowed_statuses:
raise CompatibilityError(
f"Cannot start {process_type.lower()} for '{scan_code}'. Current status is {current_status} (Must be one of {allowed_statuses})."
)
logger.debug(f"The {process_type} for '{scan_code}' can start (Current status: {current_status}).")
except (ApiError, NetworkError, ScanNotFoundError, CompatibilityError):
raise
except (ProcessError, ProcessTimeoutError):
# Re-raise process-related errors without wrapping them
raise
except Exception as e:
raise ProcessError(f"Could not verify if {process_type.lower()} can start for '{scan_code}'", details={"error": str(e)})

def _get_process_status(self, process_type: str, scan_code: str) -> str:
"""Helper to get status for a given process type."""

if process_type not in self.PROCESS_STATUS_MAP:
raise ValueError(f"Invalid process_type '{process_type}' provided to ensure_process_can_start.")

status_method = self.PROCESS_STATUS_MAP[process_type]
status_data = status_method(scan_code)

return status_data.get("status", "UNKNOWN")

def get_scan_status(self, scan_type: str, scan_code: str) -> dict:
"""
Expand All @@ -225,3 +154,92 @@ def get_scan_status(self, scan_type: str, scan_code: str) -> dict:
NotImplementedError: If called on the base class
"""
raise NotImplementedError("get_scan_status must be implemented by subclasses")

def ensure_scan_is_idle(
self,
scan_code: str,
params: argparse.Namespace,
process_types_to_check: List[str]
):
"""
Ensures specified background processes for a scan are idle (not RUNNING or QUEUED).
If a process is running/queued, waits for it to finish before proceeding.

This method can handle multiple process types at once and supports various process types
including SCAN, DEPENDENCY_ANALYSIS, GIT_CLONE, EXTRACT_ARCHIVES, and REPORT_IMPORT.

Args:
scan_code: Code of the scan to check
params: Command line parameters containing retry settings
process_types_to_check: List of process types to check (e.g., ["SCAN", "DEPENDENCY_ANALYSIS"])

Raises:
ProcessError: If there are process-related issues
ApiError: If there are API issues
NetworkError: If there are network issues
"""
logger.debug(f"Asserting idle status for processes {process_types_to_check} on scan '{scan_code}'...")
while True:
all_processes_idle_this_pass = True
logger.debug("Starting a new pass to check idle status...")
for process_type in process_types_to_check:
process_type_upper = process_type.upper()
logger.debug(f"Checking status for process type: {process_type_upper}")
current_status = "UNKNOWN"
try:
if process_type_upper == "GIT_CLONE":
current_status = self.check_status_download_content_from_git(scan_code).upper()
elif process_type_upper in ["SCAN", "DEPENDENCY_ANALYSIS", "REPORT_IMPORT"]:
status_data = self.get_scan_status(process_type_upper, scan_code)
current_status = status_data.get("status", "UNKNOWN").upper()
elif process_type_upper == "EXTRACT_ARCHIVES":
# EXTRACT_ARCHIVES status checking is handled differently
# Check if status checking is supported for this process type
if self._is_status_check_supported(scan_code, "EXTRACT_ARCHIVES"):
# Use the specialized method for checking archive extraction status
try:
status_data = self.get_scan_status("EXTRACT_ARCHIVES", scan_code)
current_status = self._standard_scan_status_accessor(status_data)
except (ApiError, ScanNotFoundError) as e:
logger.debug(f"Could not check EXTRACT_ARCHIVES status, assuming finished: {e}")
current_status = "FINISHED"
else:
logger.debug(f"EXTRACT_ARCHIVES status checking not supported. Assuming idle.")
current_status = "FINISHED"
else:
logger.warning(f"Unknown process type '{process_type_upper}' requested for idle check. Skipping.")
continue
logger.debug(f"Current status for {process_type_upper}: {current_status}")
except ScanNotFoundError:
logger.debug(f"Scan '{scan_code}' not found during idle check for {process_type_upper}. Assuming idle.")
print(f" - {process_type_upper}: Not found (considered idle).")
continue
except (ApiError, NetworkError) as e:
raise ProcessError(f"Cannot proceed: Failed to check status for {process_type_upper} due to API/Network error: {e}") from e
except Exception as e:
raise ProcessError(f"Cannot proceed: Unexpected error checking status for {process_type_upper}: {e}") from e

if current_status in ["RUNNING", "QUEUED", "NOT FINISHED"]:
all_processes_idle_this_pass = False
print(f" - {process_type_upper}: Status is {current_status}. Waiting for completion...")
try:
if process_type_upper == "GIT_CLONE":
_, _ = self.wait_for_git_clone(scan_code, params.scan_number_of_tries, params.scan_wait_time)
elif process_type_upper == "EXTRACT_ARCHIVES":
_, _ = self.wait_for_archive_extraction(scan_code, params.scan_number_of_tries, params.scan_wait_time)
else:
_, _ = self.wait_for_scan_to_finish(process_type_upper, scan_code, params.scan_number_of_tries, params.scan_wait_time)
print(f" - {process_type_upper}: Previous run finished.")
logger.debug(f"Breaking inner loop after waiting for {process_type_upper} to re-check all statuses.")
break
except (ProcessTimeoutError, ProcessError) as wait_err:
raise ProcessError(f"Cannot proceed: Waiting for existing {process_type_upper} failed: {wait_err}") from wait_err
except Exception as wait_exc:
raise ProcessError(f"Cannot proceed: Unexpected error waiting for {process_type_upper}: {wait_exc}") from wait_exc
else:
print(f" - {process_type_upper}: Status is {current_status} (considered idle).")

if all_processes_idle_this_pass:
logger.debug("All processes confirmed idle in this pass. Exiting check loop.")
break
print("All Scan processes confirmed idle! Proceeding...")
2 changes: 1 addition & 1 deletion src/workbench_cli/api/helpers/upload_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,4 +210,4 @@ def _perform_upload(self, file_path: str, headers: dict):
raise NetworkError(f"Network error during file upload: {e}") from e
finally:
if file_handle and not file_handle.closed:
file_handle.close()
file_handle.close()
59 changes: 48 additions & 11 deletions src/workbench_cli/api/scans_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,8 @@ def create_webapp_scan(
git_branch: Optional[str] = None,
git_tag: Optional[str] = None,
git_commit: Optional[str] = None,
git_depth: Optional[int] = None
git_depth: Optional[int] = None,
import_from_report: bool = False
) -> bool:
"""
Creates a new webapp scan inside a project, handling Git parameters as needed.
Expand All @@ -354,6 +355,7 @@ def create_webapp_scan(
git_tag: Optional tag name (if git_url is provided, alternative to branch).
git_commit: Optional commit hash (if git_url is provided, alternative to branch or tag).
git_depth: Optional git clone depth (if git_url is provided).
import_from_report: Whether to import the scan from an existing report

Returns:
True if the scan was successfully created, raises exception otherwise.
Expand All @@ -370,6 +372,11 @@ def create_webapp_scan(
"project_code": project_code,
}

# Add import_from_report parameter if specified
if import_from_report:
payload_data["import_from_report"] = "1"
logger.debug(" Setting scan for report import mode")

# --- Correct Git Parameter Handling ---
git_ref_value = None
git_ref_type = None
Expand Down Expand Up @@ -632,12 +639,13 @@ def run_scan(
NetworkError: If there are network issues
"""
try:
self.ensure_process_can_start(
"SCAN",
scan_code,
wait_max_tries=60, # Use a fixed reasonable default
wait_interval=30
# Create a minimal params namespace for ensure_scan_is_idle
import argparse
params_for_idle_check = argparse.Namespace(
scan_number_of_tries=60,
scan_wait_time=30
)
self.ensure_scan_is_idle(scan_code, params_for_idle_check, ["SCAN"])
except Exception as e:
logger.error(f"Pre-scan check failed for '{scan_code}': {e}")
raise
Expand Down Expand Up @@ -720,12 +728,13 @@ def start_dependency_analysis(self, scan_code: str, import_only: bool = False):
NetworkError: If there are network issues
"""
try:
self.ensure_process_can_start(
"DEPENDENCY_ANALYSIS",
scan_code,
wait_max_tries=60, # Use a fixed reasonable default
wait_interval=30
# Create a minimal params namespace for ensure_scan_is_idle
import argparse
params_for_idle_check = argparse.Namespace(
scan_number_of_tries=60,
scan_wait_time=30
)
self.ensure_scan_is_idle(scan_code, params_for_idle_check, ["DEPENDENCY_ANALYSIS"])
except Exception as e:
logger.error(f"Pre-analysis check failed for '{scan_code}': {e}")
raise
Expand Down Expand Up @@ -836,3 +845,31 @@ def check_scan_report_status(self, process_id: int, scan_code: str) -> Dict[str,
else:
error_msg = response.get("error", f"Unexpected response: {response}")
raise ApiError(f"Failed to check report status for process {process_id} (scan '{scan_code}'): {error_msg}", details=response)

def import_report(self, scan_code: str):
"""
Imports an SBOM report into a scan.

Args:
scan_code: Code of the scan to import the report into

Raises:
ApiError: If there are API issues
ScanNotFoundError: If the scan doesn't exist
NetworkError: If there are network issues
"""
logger.info(f"Starting SBOM report import for '{scan_code}'...")
payload = {
"group": "scans",
"action": "import_report",
"data": {
"scan_code": scan_code
},
}
response = self._send_request(payload)
if response.get("status") != "1":
error_msg = response.get("error", "Unknown API error")
if "Scan not found" in error_msg or "row_not_found" in error_msg:
raise ScanNotFoundError(f"Scan '{scan_code}' not found")
raise ApiError(f"Failed to start SBOM report import for '{scan_code}': {error_msg}", details=response)
logger.info(f"SBOM report import for '{scan_code}' started successfully.")
Loading
Loading