diff --git a/pyproject.toml b/pyproject.toml index e955414..0fcc603 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,6 +33,7 @@ dependencies = [ "packageurl-python>=0.17.6", "conan>=2.0,<3", "questionary>=2.0.1,<3", + "pipdeptree>=2.0.0", ] [project.urls] diff --git a/sbomify_action/_dependency_expansion/__init__.py b/sbomify_action/_dependency_expansion/__init__.py new file mode 100644 index 0000000..7c739ed --- /dev/null +++ b/sbomify_action/_dependency_expansion/__init__.py @@ -0,0 +1,50 @@ +"""Transitive dependency expansion for SBOMs. + +This module provides functionality to discover transitive dependencies +that are missing from lockfiles (particularly requirements.txt) and +add them to SBOMs. It uses ecosystem-specific tools like pipdeptree +to inspect installed packages. + +Supported lockfiles: +- Python: requirements.txt (using pipdeptree) + +Example usage: + from sbomify_action._dependency_expansion import expand_sbom_dependencies + + result = expand_sbom_dependencies( + sbom_file="sbom.json", + lock_file="requirements.txt", + ) + print(f"Added {result.added_count} transitive dependencies") + +Note: + For pipdeptree to work, packages from requirements.txt must be + installed in the current Python environment. If packages are not + installed, expansion is skipped gracefully. +""" + +from .enricher import ( + DependencyEnricher, + create_default_registry, + expand_sbom_dependencies, + supports_dependency_expansion, +) +from .models import DiscoveredDependency, ExpansionResult, normalize_python_package_name +from .protocol import DependencyExpander +from .registry import ExpanderRegistry + +__all__ = [ + # Main API + "expand_sbom_dependencies", + "supports_dependency_expansion", + # Classes for advanced usage + "DependencyEnricher", + "ExpanderRegistry", + "DependencyExpander", + # Models + "DiscoveredDependency", + "ExpansionResult", + "normalize_python_package_name", + # Factory + "create_default_registry", +] diff --git a/sbomify_action/_dependency_expansion/enricher.py b/sbomify_action/_dependency_expansion/enricher.py new file mode 100644 index 0000000..79b997c --- /dev/null +++ b/sbomify_action/_dependency_expansion/enricher.py @@ -0,0 +1,313 @@ +"""Dependency expansion orchestration for SBOMs.""" + +import json +from pathlib import Path +from typing import Any + +from cyclonedx.model.bom import Bom +from cyclonedx.model.component import Component, ComponentType +from packageurl import PackageURL + +from ..console import get_audit_trail +from ..logging_config import logger +from ..serialization import serialize_cyclonedx_bom +from .expanders.pipdeptree import PipdeptreeExpander +from .models import DiscoveredDependency, ExpansionResult, normalize_python_package_name +from .registry import ExpanderRegistry + + +def create_default_registry() -> ExpanderRegistry: + """Create registry with default expanders.""" + registry = ExpanderRegistry() + registry.register(PipdeptreeExpander()) + return registry + + +class DependencyEnricher: + """Orchestrates dependency expansion for SBOMs. + + Uses registered expanders to discover transitive dependencies + and adds them to the SBOM. Records all additions to the audit trail. + """ + + def __init__(self, registry: ExpanderRegistry | None = None) -> None: + self._registry = registry or create_default_registry() + + def expand_sbom( + self, + sbom_file: str, + lock_file: str, + ) -> ExpansionResult: + """Expand SBOM with discovered transitive dependencies. + + Args: + sbom_file: Path to SBOM file (modified in place) + lock_file: Path to lockfile that was used for generation + + Returns: + ExpansionResult with statistics + """ + sbom_path = Path(sbom_file) + lock_path = Path(lock_file) + + # Find applicable expander + expander = self._registry.get_expander_for(lock_path) + if not expander: + logger.debug(f"No expander supports {lock_path.name}") + return ExpansionResult( + original_count=0, + discovered_count=0, + added_count=0, + dependencies=[], + source="none", + ) + + # Check if expansion is possible + if not expander.can_expand(): + logger.info( + f"Dependency expansion skipped: {expander.name} cannot run " + "(packages may not be installed in environment)" + ) + return ExpansionResult( + original_count=0, + discovered_count=0, + added_count=0, + dependencies=[], + source=expander.name, + ) + + # Discover transitive dependencies + logger.info(f"Discovering transitive dependencies with {expander.name}...") + discovered = expander.expand(lock_path) + + if not discovered: + logger.info("No transitive dependencies discovered") + return ExpansionResult( + original_count=0, + discovered_count=0, + added_count=0, + dependencies=[], + source=expander.name, + ) + + # Load SBOM and detect format + with sbom_path.open("r") as f: + sbom_data = json.load(f) + + if sbom_data.get("bomFormat") == "CycloneDX": + result = self._enrich_cyclonedx(sbom_path, sbom_data, discovered, expander.name) + elif sbom_data.get("spdxVersion"): + result = self._enrich_spdx(sbom_path, sbom_data, discovered, expander.name) + else: + logger.warning("Unknown SBOM format, skipping dependency expansion") + result = ExpansionResult( + original_count=0, + discovered_count=len(discovered), + added_count=0, + dependencies=discovered, + source=expander.name, + ) + + return result + + def _enrich_cyclonedx( + self, + sbom_path: Path, + sbom_data: dict[str, Any], + discovered: list[DiscoveredDependency], + source: str, + ) -> ExpansionResult: + """Add discovered dependencies to CycloneDX SBOM.""" + bom = Bom.from_json(sbom_data) + original_count = len(bom.components) if bom.components else 0 + + # Build set of existing PURLs for deduplication + existing_purls: set[str] = set() + if bom.components: + for comp in bom.components: + if comp.purl: + existing_purls.add(str(comp.purl).lower()) + + # Add new components + added_count = 0 + audit = get_audit_trail() + + for dep in discovered: + # Skip if already exists + if dep.purl.lower() in existing_purls: + continue + + # Create new component + try: + purl_obj = PackageURL.from_string(dep.purl) + except Exception as e: + logger.warning(f"Invalid PURL {dep.purl}: {e}") + continue + + component = Component( + type=ComponentType.LIBRARY, + name=dep.name, + version=dep.version, + purl=purl_obj, + ) + + bom.components.add(component) + added_count += 1 + + # Record to audit trail + parent_info = f"discovered via {dep.parent}" if dep.parent else "discovered as transitive" + audit.record_enrichment( + component=dep.purl, + field="transitive_dependency", + value=parent_info, + source=source, + ) + + # Serialize back + spec_version = sbom_data.get("specVersion", "1.6") + serialized = serialize_cyclonedx_bom(bom, spec_version) + with sbom_path.open("w") as f: + f.write(serialized) + + logger.info( + f"Dependency expansion: added {added_count} transitive dependencies " + f"(discovered {len(discovered)}, {original_count} original)" + ) + + return ExpansionResult( + original_count=original_count, + discovered_count=len(discovered), + added_count=added_count, + dependencies=discovered, + source=source, + ) + + def _enrich_spdx( + self, + sbom_path: Path, + sbom_data: dict[str, Any], + discovered: list[DiscoveredDependency], + source: str, + ) -> ExpansionResult: + """Add discovered dependencies to SPDX SBOM.""" + packages = sbom_data.get("packages", []) + original_count = len(packages) + + # Build set of existing package identifiers (normalized for comparison) + existing: set[str] = set() + existing_spdx_ids: set[str] = set() + for pkg in packages: + name = pkg.get("name", "") + version = pkg.get("versionInfo", "") + normalized = normalize_python_package_name(name) + existing.add(f"{normalized}@{version}") + if pkg.get("SPDXID"): + existing_spdx_ids.add(pkg["SPDXID"]) + + # Add new packages + added_count = 0 + audit = get_audit_trail() + + for dep in discovered: + normalized_name = normalize_python_package_name(dep.name) + key = f"{normalized_name}@{dep.version}" + if key in existing: + continue + + # Create new SPDX package + # Generate a unique SPDXID. SPDX IDs must contain only letters, numbers, + # dots, and hyphens, and cannot start with a number. We normalize + # separators to hyphens; collision handling below ensures uniqueness + # within this document if two packages still normalize to the same ID. + safe_name = dep.name.replace("_", "-").replace(".", "-") + safe_version = dep.version.replace("_", "-") + base_spdx_id = f"SPDXRef-Package-{safe_name}-{safe_version}" + + # Handle potential collisions by adding a suffix + spdx_id = base_spdx_id + suffix = 1 + while spdx_id in existing_spdx_ids: + spdx_id = f"{base_spdx_id}-{suffix}" + suffix += 1 + existing_spdx_ids.add(spdx_id) + + new_package = { + "SPDXID": spdx_id, + "name": dep.name, + "versionInfo": dep.version, + "downloadLocation": "NOASSERTION", + "filesAnalyzed": False, + "externalRefs": [ + { + "referenceCategory": "PACKAGE-MANAGER", + "referenceType": "purl", + "referenceLocator": dep.purl, + } + ], + } + + packages.append(new_package) + added_count += 1 + + # Record to audit trail + parent_info = f"discovered via {dep.parent}" if dep.parent else "discovered as transitive" + audit.record_enrichment( + component=dep.purl, + field="transitive_dependency", + value=parent_info, + source=source, + ) + + sbom_data["packages"] = packages + + # Write back + with sbom_path.open("w") as f: + json.dump(sbom_data, f, indent=2) + + logger.info( + f"Dependency expansion: added {added_count} transitive dependencies " + f"(discovered {len(discovered)}, {original_count} original)" + ) + + return ExpansionResult( + original_count=original_count, + discovered_count=len(discovered), + added_count=added_count, + dependencies=discovered, + source=source, + ) + + +def expand_sbom_dependencies( + sbom_file: str, + lock_file: str, +) -> ExpansionResult: + """Expand SBOM with transitive dependencies from installed packages. + + This is the main public API for dependency expansion. + + Args: + sbom_file: Path to SBOM file (modified in place) + lock_file: Path to lockfile used for generation + + Returns: + ExpansionResult with statistics + """ + enricher = DependencyEnricher() + return enricher.expand_sbom(sbom_file, lock_file) + + +def supports_dependency_expansion(lock_file: str) -> bool: + """Check if dependency expansion is supported for this lockfile. + + Uses the registry to check if any expander supports the lockfile type. + + Args: + lock_file: Path to lockfile + + Returns: + True if an expander supports this lockfile type + """ + registry = create_default_registry() + expander = registry.get_expander_for(Path(lock_file)) + return expander is not None diff --git a/sbomify_action/_dependency_expansion/expanders/__init__.py b/sbomify_action/_dependency_expansion/expanders/__init__.py new file mode 100644 index 0000000..7dd82cd --- /dev/null +++ b/sbomify_action/_dependency_expansion/expanders/__init__.py @@ -0,0 +1,5 @@ +"""Dependency expander implementations.""" + +from .pipdeptree import PipdeptreeExpander + +__all__ = ["PipdeptreeExpander"] diff --git a/sbomify_action/_dependency_expansion/expanders/pipdeptree.py b/sbomify_action/_dependency_expansion/expanders/pipdeptree.py new file mode 100644 index 0000000..da715c6 --- /dev/null +++ b/sbomify_action/_dependency_expansion/expanders/pipdeptree.py @@ -0,0 +1,254 @@ +"""Pipdeptree-based dependency expander for Python requirements.txt files.""" + +import json +import subprocess +from pathlib import Path + +from ...logging_config import logger +from ...tool_checks import check_tool_available +from ..models import DiscoveredDependency, normalize_python_package_name + +_PIPDEPTREE_AVAILABLE, _PIPDEPTREE_PATH = check_tool_available("pipdeptree") + + +class PipdeptreeExpander: + """Discovers Python transitive dependencies using pipdeptree. + + pipdeptree inspects installed packages in the current Python + environment and reports their dependency tree. This expander + uses it to find transitive dependencies that are NOT listed + in requirements.txt but are installed as dependencies of + packages that ARE listed. + + Requirements: + - pipdeptree must be installed + - Packages from requirements.txt must be installed in the environment + """ + + SUPPORTED_LOCK_FILES = ("requirements.txt",) + + @property + def name(self) -> str: + return "pipdeptree" + + @property + def priority(self) -> int: + return 10 # Native Python tool, high priority + + @property + def ecosystems(self) -> list[str]: + return ["pypi"] + + def supports(self, lock_file: Path) -> bool: + """Check if this expander supports the given lockfile.""" + if not _PIPDEPTREE_AVAILABLE: + return False + return lock_file.name in self.SUPPORTED_LOCK_FILES + + def can_expand(self) -> bool: + """Check if expansion is possible. + + pipdeptree works by inspecting installed packages, so we check + if pipdeptree can run at all (packages installed in environment). + """ + if not _PIPDEPTREE_AVAILABLE: + return False + + try: + result = subprocess.run( + ["pipdeptree", "--json-tree", "--warn", "silence"], + capture_output=True, + text=True, + timeout=30, + ) + return result.returncode == 0 + except Exception: + return False + + def expand(self, lock_file: Path) -> list[DiscoveredDependency]: + """Discover transitive dependencies using pipdeptree. + + Returns list of dependencies that are installed but NOT + in the original requirements.txt. + """ + # 1. Parse requirements.txt to get direct dependency names + direct_deps = self._parse_requirements(lock_file) + direct_names = {normalize_python_package_name(name) for name in direct_deps} + + logger.debug(f"Found {len(direct_names)} direct dependencies in {lock_file.name}") + + if not direct_deps: + return [] + + # 2. Run pipdeptree filtered to only the direct dependencies + # This ensures we only see trees for packages actually in requirements.txt + package_list = ",".join(direct_deps.keys()) + tree = self._run_pipdeptree(packages=package_list) + if not tree: + logger.debug("pipdeptree returned no results for the specified packages") + return [] + + logger.debug(f"pipdeptree returned {len(tree)} direct dependency trees") + + # 3. Find transitive dependencies (deps of direct packages that aren't in requirements.txt) + discovered: list[DiscoveredDependency] = [] + seen: set[str] = set() + + for pkg in tree: + # Start at depth=0 for the direct dependency, its children are depth=1 + self._collect_transitives( + pkg, + direct_names, + discovered, + seen, + depth=0, + ) + + logger.info(f"pipdeptree discovered {len(discovered)} transitive dependencies") + return discovered + + def _parse_requirements(self, lock_file: Path) -> dict[str, str | None]: + """Parse requirements.txt and return {name: version} dict.""" + deps: dict[str, str | None] = {} + + with open(lock_file, "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + + # Skip comments and empty lines + if not line or line.startswith("#"): + continue + + # Skip options like -r, -e, --index-url, etc. + if line.startswith("-"): + continue + + # Handle environment markers (e.g., requests; python_version >= "3.6") + if ";" in line: + line = line.split(";")[0].strip() + + # Parse the requirement line + name, version = self._parse_requirement_line(line) + if name: + deps[name] = version + + return deps + + def _parse_requirement_line(self, line: str) -> tuple[str | None, str | None]: + """Parse a single requirement line. + + Handles formats like: + - requests + - requests==2.31.0 + - requests>=2.0,<3 + - requests[security]>=2.0 + """ + # Remove inline comments + if "#" in line: + line = line.split("#")[0].strip() + + if not line: + return None, None + + # Handle various specifiers: ==, >=, <=, ~=, !=, <, > + for op in ["==", ">=", "<=", "~=", "!=", "<", ">"]: + if op in line: + parts = line.split(op, 1) + name = parts[0].strip() + version = parts[1].strip() if op == "==" else None + + # Handle version ranges (e.g., "2.0,<3" -> just take first version) + if version and "," in version: + version = None # Can't determine exact version + + # Remove extras like [security] + if "[" in name: + name = name.split("[")[0] + + return name, version + + # No version specifier + name = line.strip() + if "[" in name: + name = name.split("[")[0] + + return name, None + + def _run_pipdeptree(self, packages: str | None = None) -> list[dict] | None: + """Run pipdeptree and return JSON tree. + + Args: + packages: Comma-separated list of package names to filter to. + If None, returns all packages in the environment. + """ + cmd = ["pipdeptree", "--json-tree", "--warn", "silence"] + if packages: + cmd.extend(["--packages", packages]) + + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=60, + ) + if result.returncode != 0: + logger.warning(f"pipdeptree failed: {result.stderr}") + return None + return json.loads(result.stdout) + except subprocess.TimeoutExpired: + logger.warning("pipdeptree timed out") + return None + except json.JSONDecodeError as e: + logger.warning(f"pipdeptree output not valid JSON: {e}") + return None + except Exception as e: + logger.warning(f"pipdeptree error: {e}") + return None + + def _collect_transitives( + self, + pkg: dict, + direct_names: set[str], + discovered: list[DiscoveredDependency], + seen: set[str], + depth: int, + parent: str | None = None, + ) -> None: + """Recursively collect transitive dependencies.""" + name = pkg.get("package_name", "") + if not name: + return + version = pkg.get("installed_version", "") + normalized = normalize_python_package_name(name) + + # Skip if already processed + pkg_key = f"{normalized}@{version}" + if pkg_key in seen: + return + seen.add(pkg_key) + + # If this is NOT a direct dependency and we're past the root level, + # it's a transitive dependency + if depth > 0 and normalized not in direct_names: + discovered.append( + DiscoveredDependency( + name=name, + version=version, + purl=f"pkg:pypi/{normalized}@{version}", + parent=parent, + depth=depth, + ecosystem="pypi", + ) + ) + + # Recurse into dependencies + for dep in pkg.get("dependencies", []): + self._collect_transitives( + dep, + direct_names, + discovered, + seen, + depth=depth + 1, + parent=name, + ) diff --git a/sbomify_action/_dependency_expansion/models.py b/sbomify_action/_dependency_expansion/models.py new file mode 100644 index 0000000..ed5875c --- /dev/null +++ b/sbomify_action/_dependency_expansion/models.py @@ -0,0 +1,58 @@ +"""Data models for dependency expansion.""" + +from dataclasses import dataclass + + +@dataclass +class DiscoveredDependency: + """Represents a discovered transitive dependency. + + Attributes: + name: Package name as reported by pipdeptree + version: Installed version + purl: Package URL (e.g., pkg:pypi/requests@2.31.0) + parent: Name of the package that required this dependency + depth: How many levels from a direct dependency (1 = direct transitive) + ecosystem: Package ecosystem identifier (default: pypi) + """ + + name: str + version: str + purl: str + parent: str | None = None + depth: int = 1 + ecosystem: str = "pypi" + + +@dataclass +class ExpansionResult: + """Result of dependency expansion. + + Attributes: + original_count: Number of components in SBOM before expansion + discovered_count: Total transitive dependencies discovered + added_count: Number actually added (excluding duplicates) + dependencies: List of discovered dependencies + source: Tool that discovered the dependencies (e.g., "pipdeptree") + """ + + original_count: int + discovered_count: int + added_count: int + dependencies: list[DiscoveredDependency] + source: str + + +def normalize_python_package_name(name: str) -> str: + """Normalize Python package name per PEP 503. + + PEP 503: Package names are case-insensitive and treat + hyphens, underscores, and dots as equivalent. + + Args: + name: Package name to normalize + + Returns: + Normalized lowercase name with underscores + """ + return name.lower().replace("-", "_").replace(".", "_") diff --git a/sbomify_action/_dependency_expansion/protocol.py b/sbomify_action/_dependency_expansion/protocol.py new file mode 100644 index 0000000..f22ed92 --- /dev/null +++ b/sbomify_action/_dependency_expansion/protocol.py @@ -0,0 +1,99 @@ +"""Protocol definition for dependency expanders.""" + +from pathlib import Path +from typing import Protocol + +from .models import DiscoveredDependency + + +class DependencyExpander(Protocol): + """Protocol for dependency expansion plugins. + + Each expander implements this protocol to discover transitive + dependencies using ecosystem-specific tools. Expanders are + registered with ExpanderRegistry and selected based on the + lockfile type. + + Example: + class PipdeptreeExpander: + name = "pipdeptree" + priority = 10 + ecosystems = ["pypi"] + + def supports(self, lock_file: Path) -> bool: + return lock_file.name == "requirements.txt" + + def can_expand(self) -> bool: + # Check if packages are installed + ... + + def expand(self, lock_file: Path) -> list[DiscoveredDependency]: + # Discover transitive dependencies + ... + """ + + @property + def name(self) -> str: + """Human-readable name of this expander. + + Used for logging and audit trail source identification. + Examples: "pipdeptree" + """ + ... + + @property + def priority(self) -> int: + """Priority for expander selection (lower = higher priority). + + When multiple expanders support a lockfile, the one with + lowest priority value is chosen. + """ + ... + + @property + def ecosystems(self) -> list[str]: + """Package ecosystems this expander supports. + + Examples: ["pypi"], ["npm"] + """ + ... + + def supports(self, lock_file: Path) -> bool: + """Check if this expander supports the given lockfile. + + Args: + lock_file: Path to the lockfile + + Returns: + True if this expander can process the lockfile. + """ + ... + + def can_expand(self) -> bool: + """Check if expansion is possible in the current environment. + + This checks prerequisites like: + - Required tool is installed + - Packages from lockfile are installed in environment + + Returns: + True if expand() can be called successfully. + """ + ... + + def expand(self, lock_file: Path) -> list[DiscoveredDependency]: + """Discover transitive dependencies. + + Implementations should: + 1. Parse the lockfile to identify direct dependencies + 2. Use ecosystem tooling to discover transitive dependencies + 3. Return dependencies NOT in the original lockfile + + Args: + lock_file: Path to the lockfile + + Returns: + List of discovered transitive dependencies. + Empty list if none found or expansion not possible. + """ + ... diff --git a/sbomify_action/_dependency_expansion/registry.py b/sbomify_action/_dependency_expansion/registry.py new file mode 100644 index 0000000..16d1359 --- /dev/null +++ b/sbomify_action/_dependency_expansion/registry.py @@ -0,0 +1,90 @@ +"""Registry for dependency expanders.""" + +from pathlib import Path + +from ..logging_config import logger +from .models import DiscoveredDependency +from .protocol import DependencyExpander + + +class ExpanderRegistry: + """Registry for dependency expanders. + + Manages expander instances and selects the appropriate expander + based on lockfile type and priority. + + Example: + registry = ExpanderRegistry() + registry.register(PipdeptreeExpander()) + + expander = registry.get_expander_for(Path("requirements.txt")) + if expander and expander.can_expand(): + deps = expander.expand(Path("requirements.txt")) + """ + + def __init__(self) -> None: + self._expanders: list[DependencyExpander] = [] + + def register(self, expander: DependencyExpander) -> None: + """Register an expander. + + Args: + expander: Expander instance implementing DependencyExpander protocol. + """ + self._expanders.append(expander) + logger.debug(f"Registered dependency expander: {expander.name} (priority {expander.priority})") + + def get_expander_for(self, lock_file: Path) -> DependencyExpander | None: + """Get the best expander for this lockfile. + + Selects the expander with lowest priority that supports + the lockfile. + + Args: + lock_file: Path to the lockfile + + Returns: + Expander instance if found, None otherwise. + """ + candidates = [e for e in self._expanders if e.supports(lock_file)] + + if not candidates: + return None + + # Sort by priority (lowest first) + candidates.sort(key=lambda e: e.priority) + return candidates[0] + + def expand_lockfile(self, lock_file: Path) -> list[DiscoveredDependency]: + """Expand a lockfile using the appropriate expander. + + Args: + lock_file: Path to the lockfile + + Returns: + List of discovered dependencies. Empty list if no expander + found or expansion not possible. + """ + expander = self.get_expander_for(lock_file) + + if expander is None: + logger.debug(f"No dependency expander found for: {lock_file.name}") + return [] + + if not expander.can_expand(): + logger.debug(f"Expander {expander.name} cannot expand (prerequisites not met)") + return [] + + logger.debug(f"Using {expander.name} to expand {lock_file.name}") + try: + dependencies = expander.expand(lock_file) + logger.debug(f"Discovered {len(dependencies)} transitive dependencies") + return dependencies + except Exception as e: + logger.warning(f"Failed to expand {lock_file.name}: {e}") + return [] + + @property + def registered_expanders(self) -> list[str]: + """Get names of all registered expanders.""" + return [e.name for e in self._expanders] diff --git a/sbomify_action/cli/main.py b/sbomify_action/cli/main.py index 6f5ed78..1e579ff 100644 --- a/sbomify_action/cli/main.py +++ b/sbomify_action/cli/main.py @@ -20,6 +20,7 @@ from ..augmentation import augment_sbom_from_file from ..console import ( get_audit_trail, + gha_group, gha_notice, print_component_not_found_error, print_duplicate_sbom_error, @@ -908,24 +909,24 @@ def print_banner() -> None: console_print_banner(SBOMIFY_VERSION) -def _log_step_header(step_num: int, title: str, emoji: str = "") -> None: +def _log_step_header(step_num: int | float, title: str, emoji: str = "") -> None: """ Log a nicely formatted step header optimized for GitHub Actions. Args: - step_num: Step number (1-6) + step_num: Step number (e.g., 1, 2, or substeps like 1.4, 1.5) title: Step title emoji: Optional emoji to include (deprecated, will be ignored) """ print_step_header(step_num, title) -def _log_step_end(step_num: int, success: bool = True) -> None: +def _log_step_end(step_num: int | float, success: bool = True) -> None: """ Log step completion and close GitHub Actions group if applicable. Args: - step_num: Step number (1-6) + step_num: Step number (e.g., 1, 2, or substeps like 1.4, 1.5) success: Whether the step completed successfully """ print_step_end(step_num, success) @@ -1071,6 +1072,53 @@ def run_pipeline(config: Config) -> None: ) # Don't fail the entire process for additional packages injection issues + # Step 1.4: Transitive Dependency Discovery (for lockfiles that support expansion) + # Note: Steps 1.x are substeps of the main SBOM generation step (Step 1). + # These run after initial generation but before Step 2 (Validation/Augmentation). + # Uses registry pattern to check if any expander supports the lockfile + if config.lock_file: + from sbomify_action._dependency_expansion import supports_dependency_expansion + + if supports_dependency_expansion(config.lock_file): + _log_step_header(1.4, "Transitive Dependency Discovery") + try: + from sbomify_action._dependency_expansion import expand_sbom_dependencies + + logger.info("Discovering transitive dependencies...") + + result = expand_sbom_dependencies( + sbom_file=STEP_1_FILE, + lock_file=config.lock_file, + ) + + if result.added_count > 0: + logger.info( + f"Added {result.added_count} transitive dependencies (discovered {result.discovered_count} total)" + ) + # Log discovered packages in collapsible group + with gha_group("Discovered Transitive Dependencies"): + for dep in result.dependencies[:50]: + parent_info = f" (via {dep.parent})" if dep.parent else "" + print(f" {dep.purl}{parent_info}") + if len(result.dependencies) > 50: + print(f" ... and {len(result.dependencies) - 50} more") + else: + if result.discovered_count > 0: + logger.info( + f"No new dependencies to add ({result.discovered_count} discovered were already in SBOM)" + ) + else: + logger.info( + "No transitive dependencies discovered (packages may not be installed, or all deps are direct)" + ) + + _log_step_end(1.4) + + except Exception as e: + logger.warning(f"Transitive dependency discovery failed (non-fatal): {e}") + _log_step_end(1.4, success=False) + # Don't fail the entire process - this is an enhancement + # Step 1.5: Hash Enrichment from Lockfile (if lockfile was used for generation) if config.lock_file: _log_step_header(1.5, "Hash Enrichment from Lockfile") diff --git a/sbomify_action/console.py b/sbomify_action/console.py index c56cd29..233d47c 100644 --- a/sbomify_action/console.py +++ b/sbomify_action/console.py @@ -106,7 +106,7 @@ def print_banner(version: str = "unknown") -> None: console.print(banner) -def print_step_header(step_num: int, title: str) -> None: +def print_step_header(step_num: int | float, title: str) -> None: """ Print a styled step header. @@ -114,7 +114,7 @@ def print_step_header(step_num: int, title: str) -> None: In other environments, uses Rich styling. Args: - step_num: Step number (1-6) + step_num: Step number (e.g., 1, 2, or substeps like 1.4, 1.5) title: Step title """ step_title = f"STEP {step_num}: {title}" @@ -129,12 +129,12 @@ def print_step_header(step_num: int, title: str) -> None: console.rule(f"[bold blue]{step_title}[/bold blue]", style="blue") -def print_step_end(step_num: int, success: bool = True) -> None: +def print_step_end(step_num: int | float, success: bool = True) -> None: """ Print step completion status and close GitHub Actions group. Args: - step_num: Step number (1-6) + step_num: Step number (e.g., 1, 2, or substeps like 1.4, 1.5) success: Whether the step completed successfully """ if success: diff --git a/tests/test_dependency_expansion.py b/tests/test_dependency_expansion.py new file mode 100644 index 0000000..6c1e48d --- /dev/null +++ b/tests/test_dependency_expansion.py @@ -0,0 +1,372 @@ +"""Tests for the dependency expansion subsystem.""" + +import json +from pathlib import Path +from unittest.mock import MagicMock, patch + +from sbomify_action._dependency_expansion import ( + DiscoveredDependency, + ExpansionResult, + create_default_registry, + expand_sbom_dependencies, + normalize_python_package_name, +) +from sbomify_action._dependency_expansion.expanders.pipdeptree import PipdeptreeExpander +from sbomify_action._dependency_expansion.registry import ExpanderRegistry + + +class TestNormalizePythonPackageName: + """Tests for normalize_python_package_name function.""" + + def test_lowercase(self): + """Test that names are lowercased.""" + assert normalize_python_package_name("Django") == "django" + assert normalize_python_package_name("REQUESTS") == "requests" + + def test_hyphen_to_underscore(self): + """Test that hyphens become underscores.""" + assert normalize_python_package_name("my-package") == "my_package" + assert normalize_python_package_name("some-long-name") == "some_long_name" + + def test_dot_to_underscore(self): + """Test that dots become underscores.""" + assert normalize_python_package_name("zope.interface") == "zope_interface" + + def test_mixed_normalization(self): + """Test mixed case with various separators.""" + assert normalize_python_package_name("My-Package.Name") == "my_package_name" + + +class TestDiscoveredDependency: + """Tests for DiscoveredDependency dataclass.""" + + def test_basic_creation(self): + """Test basic dataclass creation.""" + dep = DiscoveredDependency( + name="urllib3", + version="2.0.4", + purl="pkg:pypi/urllib3@2.0.4", + parent="requests", + depth=1, + ) + assert dep.name == "urllib3" + assert dep.version == "2.0.4" + assert dep.purl == "pkg:pypi/urllib3@2.0.4" + assert dep.parent == "requests" + assert dep.depth == 1 + assert dep.ecosystem == "pypi" # Default + + def test_defaults(self): + """Test default values.""" + dep = DiscoveredDependency( + name="certifi", + version="2023.7.22", + purl="pkg:pypi/certifi@2023.7.22", + ) + assert dep.parent is None + assert dep.depth == 1 + assert dep.ecosystem == "pypi" + + +class TestExpansionResult: + """Tests for ExpansionResult dataclass.""" + + def test_basic_creation(self): + """Test basic dataclass creation.""" + result = ExpansionResult( + original_count=5, + discovered_count=10, + added_count=8, + dependencies=[], + source="pipdeptree", + ) + assert result.original_count == 5 + assert result.discovered_count == 10 + assert result.added_count == 8 + assert result.source == "pipdeptree" + + def test_source_required(self): + """Test that source is a required field.""" + # source must be provided explicitly - no default value + result = ExpansionResult( + original_count=0, + discovered_count=0, + added_count=0, + dependencies=[], + source="test-expander", + ) + assert result.source == "test-expander" + + +class TestPipdeptreeExpander: + """Tests for PipdeptreeExpander class.""" + + def test_name(self): + """Test expander name.""" + expander = PipdeptreeExpander() + assert expander.name == "pipdeptree" + + def test_priority(self): + """Test expander priority.""" + expander = PipdeptreeExpander() + assert expander.priority == 10 + + def test_ecosystems(self): + """Test supported ecosystems.""" + expander = PipdeptreeExpander() + assert "pypi" in expander.ecosystems + + @patch("sbomify_action._dependency_expansion.expanders.pipdeptree._PIPDEPTREE_AVAILABLE", True) + def test_supports_requirements_txt(self): + """Test that requirements.txt is supported.""" + expander = PipdeptreeExpander() + assert expander.supports(Path("requirements.txt")) + + @patch("sbomify_action._dependency_expansion.expanders.pipdeptree._PIPDEPTREE_AVAILABLE", True) + def test_not_supports_other_lockfiles(self): + """Test that other lockfiles are not supported.""" + expander = PipdeptreeExpander() + assert not expander.supports(Path("poetry.lock")) + assert not expander.supports(Path("Pipfile.lock")) + assert not expander.supports(Path("uv.lock")) + + @patch("sbomify_action._dependency_expansion.expanders.pipdeptree._PIPDEPTREE_AVAILABLE", False) + def test_not_supports_when_tool_unavailable(self): + """Test that nothing is supported when pipdeptree is unavailable.""" + expander = PipdeptreeExpander() + assert not expander.supports(Path("requirements.txt")) + + def test_parse_requirements_basic(self, tmp_path): + """Test parsing basic requirements.txt.""" + req_file = tmp_path / "requirements.txt" + req_file.write_text( + """ +requests==2.31.0 +django>=4.0 +flask +""" + ) + + expander = PipdeptreeExpander() + deps = expander._parse_requirements(req_file) + + assert "requests" in deps + assert deps["requests"] == "2.31.0" + assert "django" in deps + assert deps["django"] is None # No exact version + assert "flask" in deps + assert deps["flask"] is None + + def test_parse_requirements_with_comments(self, tmp_path): + """Test parsing requirements.txt with comments.""" + req_file = tmp_path / "requirements.txt" + req_file.write_text( + """ +# This is a comment +requests==2.31.0 + +# Another comment +django>=4.0 # inline comment +""" + ) + + expander = PipdeptreeExpander() + deps = expander._parse_requirements(req_file) + + assert "requests" in deps + assert "django" in deps + assert len(deps) == 2 + + def test_parse_requirements_with_extras(self, tmp_path): + """Test parsing requirements with extras.""" + req_file = tmp_path / "requirements.txt" + req_file.write_text("requests[security]==2.31.0\n") + + expander = PipdeptreeExpander() + deps = expander._parse_requirements(req_file) + + assert "requests" in deps + assert deps["requests"] == "2.31.0" + + def test_parse_requirements_with_options(self, tmp_path): + """Test that option lines are skipped.""" + req_file = tmp_path / "requirements.txt" + req_file.write_text( + """ +-r base.txt +--index-url https://pypi.org/simple +-e git+https://github.com/user/repo.git +requests==2.31.0 +""" + ) + + expander = PipdeptreeExpander() + deps = expander._parse_requirements(req_file) + + # Only requests should be parsed + assert len(deps) == 1 + assert "requests" in deps + + def test_parse_requirements_with_markers(self, tmp_path): + """Test parsing requirements with environment markers.""" + req_file = tmp_path / "requirements.txt" + req_file.write_text('requests==2.31.0; python_version >= "3.6"\n') + + expander = PipdeptreeExpander() + deps = expander._parse_requirements(req_file) + + assert "requests" in deps + assert deps["requests"] == "2.31.0" + + def test_collect_transitives(self): + """Test transitive dependency collection.""" + expander = PipdeptreeExpander() + + # Simulated pipdeptree JSON tree + tree = [ + { + "package_name": "requests", + "installed_version": "2.31.0", + "dependencies": [ + { + "package_name": "urllib3", + "installed_version": "2.0.4", + "dependencies": [], + }, + { + "package_name": "certifi", + "installed_version": "2023.7.22", + "dependencies": [], + }, + ], + } + ] + + direct_names = {"requests"} + discovered: list[DiscoveredDependency] = [] + seen_package_versions: set[str] = set() + + for pkg in tree: + expander._collect_transitives(pkg, direct_names, discovered, seen_package_versions, depth=0) + + # Should discover urllib3 and certifi as transitives + assert len(discovered) == 2 + purl_names = {d.purl for d in discovered} + assert "pkg:pypi/urllib3@2.0.4" in purl_names + assert "pkg:pypi/certifi@2023.7.22" in purl_names + + # Both should have requests as parent + for dep in discovered: + assert dep.parent == "requests" + assert dep.depth == 1 + + +class TestExpanderRegistry: + """Tests for ExpanderRegistry class.""" + + def test_register_and_get(self): + """Test registering and retrieving expanders.""" + registry = ExpanderRegistry() + expander = PipdeptreeExpander() + + registry.register(expander) + + # Mock pipdeptree as available for this test + with patch.object(expander, "supports", return_value=True): + result = registry.get_expander_for(Path("requirements.txt")) + assert result is not None + assert result.name == "pipdeptree" + + def test_no_expander_found(self): + """Test when no expander supports the lockfile.""" + registry = ExpanderRegistry() + result = registry.get_expander_for(Path("unknown.lock")) + assert result is None + + def test_priority_ordering(self): + """Test that expanders are selected by priority.""" + registry = ExpanderRegistry() + + # Create mock expanders with different priorities + low_priority = MagicMock() + low_priority.name = "low" + low_priority.priority = 100 + low_priority.supports.return_value = True + + high_priority = MagicMock() + high_priority.name = "high" + high_priority.priority = 10 + high_priority.supports.return_value = True + + registry.register(low_priority) + registry.register(high_priority) + + result = registry.get_expander_for(Path("test.txt")) + assert result.name == "high" + + +class TestDefaultRegistry: + """Tests for default registry creation.""" + + def test_create_default_registry(self): + """Test that default registry is created with pipdeptree.""" + registry = create_default_registry() + assert "pipdeptree" in registry.registered_expanders + + +class TestExpandSbomDependencies: + """Integration tests for expand_sbom_dependencies function.""" + + def test_returns_empty_result_for_unsupported_lockfile(self, tmp_path): + """Test that unsupported lockfiles return empty result.""" + # Create a minimal SBOM + sbom_file = tmp_path / "sbom.json" + sbom_file.write_text( + json.dumps( + { + "bomFormat": "CycloneDX", + "specVersion": "1.6", + "components": [], + } + ) + ) + + # Create an unsupported lockfile + lock_file = tmp_path / "unknown.lock" + lock_file.write_text("some content") + + result = expand_sbom_dependencies(str(sbom_file), str(lock_file)) + + assert result.added_count == 0 + assert result.discovered_count == 0 + assert result.source == "none" + + @patch("sbomify_action._dependency_expansion.expanders.pipdeptree._PIPDEPTREE_AVAILABLE", True) + @patch("sbomify_action._dependency_expansion.expanders.pipdeptree.PipdeptreeExpander.can_expand") + def test_skips_when_cannot_expand(self, mock_can_expand, tmp_path): + """Test that expansion is skipped when prerequisites not met.""" + mock_can_expand.return_value = False + + # Create a minimal SBOM + sbom_file = tmp_path / "sbom.json" + original_content = json.dumps( + { + "bomFormat": "CycloneDX", + "specVersion": "1.6", + "components": [], + } + ) + sbom_file.write_text(original_content) + + # Create requirements.txt + lock_file = tmp_path / "requirements.txt" + lock_file.write_text("requests==2.31.0") + + result = expand_sbom_dependencies(str(sbom_file), str(lock_file)) + + assert result.added_count == 0 + assert result.discovered_count == 0 + assert result.source == "pipdeptree" + + # Verify SBOM file was not modified + assert sbom_file.read_text() == original_content diff --git a/uv.lock b/uv.lock index 0cef9de..b0a1f4e 100644 --- a/uv.lock +++ b/uv.lock @@ -956,6 +956,15 @@ version = "1.18.1" source = { registry = "https://pypi.org/simple" } sdist = { url = "https://files.pythonhosted.org/packages/ee/c0/53a2f017ac5b5397a7064c2654b73c3334ac8461315707cbede6c12199eb/patch-ng-1.18.1.tar.gz", hash = "sha256:52fd46ee46f6c8667692682c1fd7134edc65a2d2d084ebec1d295a6087fc0291", size = 17913, upload-time = "2024-10-25T12:20:10.591Z" } +[[package]] +name = "pip" +version = "26.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/44/c2/65686a7783a7c27a329706207147e82f23c41221ee9ae33128fc331670a0/pip-26.0.tar.gz", hash = "sha256:3ce220a0a17915972fbf1ab451baae1521c4539e778b28127efa79b974aff0fa", size = 1812654, upload-time = "2026-01-31T01:40:54.361Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/69/00/5ac7aa77688ec4d34148b423d34dc0c9bc4febe0d872a9a1ad9860b2f6f1/pip-26.0-py3-none-any.whl", hash = "sha256:98436feffb9e31bc9339cf369fd55d3331b1580b6a6f1173bacacddcf9c34754", size = 1787564, upload-time = "2026-01-31T01:40:52.252Z" }, +] + [[package]] name = "pip-requirements-parser" version = "32.0.1" @@ -969,6 +978,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/54/d0/d04f1d1e064ac901439699ee097f58688caadea42498ec9c4b4ad2ef84ab/pip_requirements_parser-32.0.1-py3-none-any.whl", hash = "sha256:4659bc2a667783e7a15d190f6fccf8b2486685b6dba4c19c3876314769c57526", size = 35648, upload-time = "2022-12-21T15:25:21.046Z" }, ] +[[package]] +name = "pipdeptree" +version = "2.30.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "packaging" }, + { name = "pip" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/03/65/f1881a1b6c49e9c94a463f7c52a23cbadcb0e778418056c9f97cbfae4ede/pipdeptree-2.30.0.tar.gz", hash = "sha256:0f78fe4bcf36a72d0d006aee0f4e315146cb278e4c4d51621f370a3d6b8861c1", size = 42737, upload-time = "2025-11-12T04:16:20.315Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/90/3d/21d6b9a0d04c6aa48621d500af4648435e4b0906437c8474ddb188249436/pipdeptree-2.30.0-py3-none-any.whl", hash = "sha256:e08ee7eb8152c0d67aee308c8477a489ab0af1a4aafe988d9d2d9998f78a24a6", size = 34017, upload-time = "2025-11-12T04:16:18.835Z" }, +] + [[package]] name = "platformdirs" version = "4.5.1" @@ -1457,6 +1479,7 @@ dependencies = [ { name = "cyclonedx-bom" }, { name = "cyclonedx-python-lib" }, { name = "packageurl-python" }, + { name = "pipdeptree" }, { name = "questionary" }, { name = "requests" }, { name = "rich" }, @@ -1482,6 +1505,7 @@ requires-dist = [ { name = "cyclonedx-bom", specifier = ">=7.2.1,<8" }, { name = "cyclonedx-python-lib", specifier = ">=11.5.0,<12" }, { name = "packageurl-python", specifier = ">=0.17.6" }, + { name = "pipdeptree", specifier = ">=2.0.0" }, { name = "questionary", specifier = ">=2.0.1,<3" }, { name = "requests", specifier = ">=2.32.3,<3" }, { name = "rich", specifier = ">=14.2.0" },