Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions sbomify_action/_enrichment/sources/conan.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from ..metadata import NormalizedMetadata
from ..sanitization import normalize_vcs_url
from .purl import PURL_TYPE_TO_SUPPLIER

# Simple in-memory cache
_cache: Dict[str, Optional[NormalizedMetadata]] = {}
Expand Down Expand Up @@ -227,15 +228,17 @@ def _extract_metadata_from_graph(self, package_name: str, graph: Any) -> Optiona
if repository_url:
field_sources["repository_url"] = self.name

# Use author as supplier if available
supplier = author if author else None
if supplier:
field_sources["supplier"] = self.name
# Supplier is always the distribution platform
field_sources["supplier"] = self.name

# Preserve author info as maintainer_name
maintainer_name = author if author else None

metadata = NormalizedMetadata(
description=description,
licenses=licenses,
supplier=supplier,
supplier=PURL_TYPE_TO_SUPPLIER["conan"],
maintainer_name=maintainer_name,
homepage=homepage,
repository_url=repository_url,
registry_url=f"https://conan.io/center/recipes/{package_name}",
Expand Down
9 changes: 4 additions & 5 deletions sbomify_action/_enrichment/sources/cratesio.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from ..license_utils import normalize_license_list
from ..metadata import NormalizedMetadata
from ..sanitization import normalize_vcs_url
from .purl import PURL_TYPE_TO_SUPPLIER

CRATESIO_API_BASE = "https://crates.io/api/v1/crates"
DEFAULT_TIMEOUT = 10 # seconds
Expand Down Expand Up @@ -161,8 +162,8 @@ def _normalize_response(
field_sources["description"] = self.name
if licenses:
field_sources["licenses"] = self.name
if maintainer_name:
field_sources["supplier"] = self.name
# Supplier is always the distribution platform
field_sources["supplier"] = self.name
if homepage:
field_sources["homepage"] = self.name
if repository_url:
Expand All @@ -174,9 +175,7 @@ def _normalize_response(
description=description,
licenses=licenses,
license_texts=license_texts,
# supplier is the NTIA-required field; maintainer_name provides additional detail.
# For crates.io, the publisher (published_by) serves as both.
supplier=maintainer_name,
supplier=PURL_TYPE_TO_SUPPLIER["cargo"],
homepage=homepage,
repository_url=repository_url,
documentation_url=documentation,
Expand Down
16 changes: 13 additions & 3 deletions sbomify_action/_enrichment/sources/depsdev.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from ..metadata import NormalizedMetadata
from ..sanitization import normalize_vcs_url
from ..utils import get_qualified_name
from .purl import PURL_TYPE_TO_SUPPLIER

DEPSDEV_API_BASE = "https://api.deps.dev/v3"
DEFAULT_TIMEOUT = 10 # seconds - deps.dev is generally fast
Expand Down Expand Up @@ -108,7 +109,7 @@ def fetch(self, purl: PackageURL, session: requests.Session) -> Optional[Normali
metadata = None
if response.status_code == 200:
data = response.json()
metadata = self._normalize_response(purl.name, data)
metadata = self._normalize_response(purl.name, purl.type, data)
elif response.status_code == 404:
logger.debug(f"Package not found in deps.dev: {purl}")
else:
Expand All @@ -131,12 +132,15 @@ def fetch(self, purl: PackageURL, session: requests.Session) -> Optional[Normali
_cache[cache_key] = None
return None

def _normalize_response(self, package_name: str, data: Dict[str, Any]) -> Optional[NormalizedMetadata]:
def _normalize_response(
self, package_name: str, purl_type: str, data: Dict[str, Any]
) -> Optional[NormalizedMetadata]:
"""
Normalize deps.dev API response to NormalizedMetadata.

Args:
package_name: Name of the package
purl_type: PURL type (e.g., "pypi", "npm", "cargo")
data: Raw deps.dev API response

Returns:
Expand Down Expand Up @@ -179,17 +183,23 @@ def _normalize_response(self, package_name: str, data: Dict[str, Any]) -> Option
if repository_url:
repository_url = normalize_vcs_url(repository_url)

# Get supplier from PURL type mapping
supplier = PURL_TYPE_TO_SUPPLIER.get(purl_type)

# Build field_sources for attribution
field_sources = {}
field_sources: dict[str, str] = {}
if licenses:
field_sources["licenses"] = self.name
if homepage:
field_sources["homepage"] = self.name
if repository_url:
field_sources["repository_url"] = self.name
if supplier:
field_sources["supplier"] = self.name

metadata = NormalizedMetadata(
licenses=licenses,
supplier=supplier,
homepage=homepage,
repository_url=repository_url,
source=self.name,
Expand Down
24 changes: 7 additions & 17 deletions sbomify_action/_enrichment/sources/ecosystems.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from ..metadata import NormalizedMetadata
from ..sanitization import normalize_vcs_url
from ..utils import purl_to_string
from .purl import PURL_TYPE_TO_SUPPLIER

ECOSYSTEMS_API_BASE = "https://packages.ecosyste.ms/api/v1"
DEFAULT_TIMEOUT = 15 # seconds - ecosyste.ms can be slower
Expand Down Expand Up @@ -87,9 +88,9 @@ def fetch(self, purl: PackageURL, session: requests.Session) -> Optional[Normali
data = response.json()
# API returns an array, take first result
if isinstance(data, list) and len(data) > 0:
metadata = self._normalize_response(data[0])
metadata = self._normalize_response(purl.type, data[0])
elif isinstance(data, dict):
metadata = self._normalize_response(data)
metadata = self._normalize_response(purl.type, data)
else:
logger.debug(f"No package data found in ecosyste.ms for: {purl_str}")
elif response.status_code == 404:
Expand Down Expand Up @@ -119,11 +120,12 @@ def fetch(self, purl: PackageURL, session: requests.Session) -> Optional[Normali
_cache[cache_key] = None
return None

def _normalize_response(self, data: Dict[str, Any]) -> Optional[NormalizedMetadata]:
def _normalize_response(self, purl_type: str, data: Dict[str, Any]) -> Optional[NormalizedMetadata]:
"""
Normalize ecosyste.ms API response to NormalizedMetadata.

Args:
purl_type: PURL type (e.g., "pypi", "npm", "cargo")
data: Raw ecosyste.ms API response

Returns:
Expand Down Expand Up @@ -151,20 +153,8 @@ def _normalize_response(self, data: Dict[str, Any]) -> Optional[NormalizedMetada
maintainer_name = first_maintainer.get("name") or first_maintainer.get("login")
maintainer_email = first_maintainer.get("email")

# Extract supplier from maintainer or repo owner
# NEVER use ecosystem name as supplier - "pypi", "npm", etc. are platforms, not suppliers
supplier = None
# Priority 1: Maintainer name or login (already extracted above)
if maintainer_name:
supplier = maintainer_name
# Priority 2: Repo owner name or login
elif data.get("repo_metadata") and data["repo_metadata"].get("owner"):
owner = data["repo_metadata"]["owner"]
if isinstance(owner, dict):
supplier = owner.get("name") or owner.get("login")
elif isinstance(owner, str):
supplier = owner
# Do NOT fall back to data["ecosystem"] - it's just the platform name
# Supplier is the distribution platform based on PURL type
supplier = PURL_TYPE_TO_SUPPLIER.get(purl_type)

# Extract issue tracker URL from repo metadata
issue_tracker_url = None
Expand Down
18 changes: 8 additions & 10 deletions sbomify_action/_enrichment/sources/pubdev.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from ..metadata import NormalizedMetadata
from ..sanitization import normalize_vcs_url
from ..utils import parse_author_string
from .purl import PURL_TYPE_TO_SUPPLIER

PUBDEV_API_BASE = "https://pub.dev/api/packages"
DEFAULT_TIMEOUT = 10 # seconds - pub.dev is generally fast
Expand Down Expand Up @@ -134,26 +135,23 @@ def _normalize_response(self, package_name: str, data: Dict[str, Any]) -> Option
documentation_url = pubspec.get("documentation")
issue_tracker_url = pubspec.get("issue_tracker")

# Extract publisher/author info using shared utility
supplier = None
# Extract author info for maintainer_name field
maintainer_name = None
maintainer_email = None

# pub.dev uses 'authors' (list) or 'author' (string) in older pubspecs
authors = pubspec.get("authors")
if authors and isinstance(authors, list) and len(authors) > 0:
maintainer_name, maintainer_email = parse_author_string(authors[0])
supplier = maintainer_name
elif pubspec.get("author"):
maintainer_name, maintainer_email = parse_author_string(pubspec["author"])
supplier = maintainer_name

# Check for publisher in the top-level response (newer pub.dev API)
# Publisher takes precedence over author for supplier
# Use publisher ID as maintainer_name if available
if data.get("publisher"):
publisher_id = data["publisher"].get("publisherId")
if publisher_id:
supplier = publisher_id
if publisher_id and not maintainer_name:
maintainer_name = publisher_id

logger.debug(f"Successfully fetched pub.dev metadata for: {package_name}")

Expand All @@ -163,8 +161,8 @@ def _normalize_response(self, package_name: str, data: Dict[str, Any]) -> Option
field_sources["description"] = self.name
if licenses:
field_sources["licenses"] = self.name
if supplier:
field_sources["supplier"] = self.name
# Supplier is always the distribution platform
field_sources["supplier"] = self.name
if homepage:
field_sources["homepage"] = self.name
if repository_url:
Expand All @@ -178,7 +176,7 @@ def _normalize_response(self, package_name: str, data: Dict[str, Any]) -> Option
description=description,
licenses=licenses,
license_texts=license_texts,
supplier=supplier,
supplier=PURL_TYPE_TO_SUPPLIER["pub"],
homepage=homepage,
repository_url=repository_url,
documentation_url=documentation_url,
Expand Down
56 changes: 56 additions & 0 deletions sbomify_action/_enrichment/sources/purl.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,58 @@
"chainguard": "Chainguard, Inc.",
}

# Mapping of PURL type to distribution platform supplier name
# For language packages, the distribution platform (not the author) is the supplier
PURL_TYPE_TO_SUPPLIER: dict[str, str] = {
# Language package registries
"pypi": "Python Package Index (PyPI)",
"npm": "npm",
"cargo": "crates.io",
"maven": "Maven Central",
"gem": "RubyGems.org",
"nuget": "NuGet Gallery",
"golang": "Go Modules",
"pub": "pub.dev",
"conan": "Conan Center",
"composer": "Packagist",
"hex": "Hex.pm",
"cocoapods": "CocoaPods",
"conda": "Anaconda",
"hackage": "Hackage",
"swift": "Swift Package Registry",
# Container registries
"docker": "Docker Hub",
"oci": "OCI Registry",
}


def get_supplier_for_purl(purl: PackageURL) -> str | None:
"""Get the appropriate supplier for a PURL.

For OS packages (deb, rpm, apk), uses NAMESPACE_TO_SUPPLIER based on the
distribution namespace (e.g., debian, ubuntu, alpine).

For language packages (pypi, npm, cargo, etc.), uses PURL_TYPE_TO_SUPPLIER
to return the distribution platform as the supplier.

Args:
purl: Parsed PackageURL

Returns:
Supplier name or None if not found
"""
# OS packages use namespace-based supplier (distribution name)
if purl.type in OS_PACKAGE_TYPES and purl.namespace:
supplier = NAMESPACE_TO_SUPPLIER.get(purl.namespace.lower())
if supplier:
return supplier
# Fallback for unknown namespaces
return f"{purl.namespace.title()} Project"

# Language packages use type-based supplier (platform name)
return PURL_TYPE_TO_SUPPLIER.get(purl.type)


# Mapping of PURL type/namespace to package tracker URL templates
PACKAGE_TRACKER_URLS: Dict[str, Dict[str, str]] = {
"deb": {
Expand Down Expand Up @@ -128,11 +180,15 @@ def fetch(self, purl: PackageURL, session: requests.Session) -> Optional[Normali
field_sources = {}
if supplier:
field_sources["supplier"] = self.name
# For OS packages, the distribution is also the maintainer/publisher
field_sources["maintainer_name"] = self.name
if homepage:
field_sources["homepage"] = self.name

return NormalizedMetadata(
supplier=supplier,
# For OS packages, distribution is the publisher (maintainer_name -> component.publisher)
maintainer_name=supplier,
homepage=homepage,
source=self.name,
field_sources=field_sources,
Expand Down
7 changes: 4 additions & 3 deletions sbomify_action/_enrichment/sources/pypi.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from ..metadata import NormalizedMetadata
from ..sanitization import normalize_vcs_url
from ..utils import parse_author_string
from .purl import PURL_TYPE_TO_SUPPLIER

PYPI_API_BASE = "https://pypi.org/pypi"
DEFAULT_TIMEOUT = 10 # seconds - PyPI is fast
Expand Down Expand Up @@ -169,8 +170,8 @@ def _normalize_response(self, package_name: str, data: Dict[str, Any]) -> Normal
field_sources["description"] = self.name
if licenses:
field_sources["licenses"] = self.name
if maintainer_name:
field_sources["supplier"] = self.name
# Supplier is always the distribution platform
field_sources["supplier"] = self.name
if homepage:
field_sources["homepage"] = self.name
if repository_url:
Expand All @@ -184,7 +185,7 @@ def _normalize_response(self, package_name: str, data: Dict[str, Any]) -> Normal
description=info.get("summary"),
licenses=licenses,
license_texts=license_texts,
supplier=maintainer_name, # Use author/maintainer as supplier
supplier=PURL_TYPE_TO_SUPPLIER["pypi"],
homepage=homepage,
repository_url=repository_url,
documentation_url=documentation_url,
Expand Down
16 changes: 12 additions & 4 deletions sbomify_action/enrichment.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
from cyclonedx.model import ExternalReference, ExternalReferenceType, Property, XsUri
from cyclonedx.model.bom import Bom
from cyclonedx.model.component import Component, ComponentType
from cyclonedx.model.contact import OrganizationalEntity
from cyclonedx.model.license import LicenseExpression
from spdx_tools.spdx.model import (
Actor,
Expand Down Expand Up @@ -417,12 +418,19 @@ def _apply_metadata_to_cyclonedx_component(
component.licenses.add(license_expr)
added_fields.append("license")

# Publisher (sanitized)
if not component.publisher and metadata.supplier:
# Publisher - use maintainer_name (author), not supplier (distribution platform)
if not component.publisher and metadata.maintainer_name:
sanitized_publisher = sanitize_supplier(metadata.maintainer_name)
if sanitized_publisher:
component.publisher = sanitized_publisher
added_fields.append("publisher")

# Supplier - use supplier (distribution platform like PyPI, npm, etc.)
if not component.supplier and metadata.supplier:
sanitized_supplier = sanitize_supplier(metadata.supplier)
if sanitized_supplier:
component.publisher = sanitized_supplier
added_fields.append("publisher")
component.supplier = OrganizationalEntity(name=sanitized_supplier)
added_fields.append("supplier")

# External references helper (with URL sanitization)
def _add_external_ref(ref_type: ExternalReferenceType, url: str, field_name: str = "url") -> bool:
Expand Down
Loading