diff --git a/.gitignore b/.gitignore index 4bc971ba4..ddf49dfd0 100644 --- a/.gitignore +++ b/.gitignore @@ -181,3 +181,4 @@ docs/_build bin/ requirements.txt .macaron_env_file +.DS_Store diff --git a/src/macaron/malware_analyzer/pypi_heuristics/heuristics.py b/src/macaron/malware_analyzer/pypi_heuristics/heuristics.py index bd829a0f1..b3ce0ced0 100644 --- a/src/macaron/malware_analyzer/pypi_heuristics/heuristics.py +++ b/src/macaron/malware_analyzer/pypi_heuristics/heuristics.py @@ -37,6 +37,9 @@ class Heuristics(str, Enum): #: Indicates that the package has an unusually large version number for a single release. ANOMALOUS_VERSION = "anomalous_version" + #: Indicates that the package has a similar structure to other packages maintained by the same user. + SIMILAR_PROJECTS = "similar_projects" + class HeuristicResult(str, Enum): """Result type indicating the outcome of a heuristic.""" diff --git a/src/macaron/malware_analyzer/pypi_heuristics/metadata/similar_projects.py b/src/macaron/malware_analyzer/pypi_heuristics/metadata/similar_projects.py new file mode 100644 index 000000000..60930021e --- /dev/null +++ b/src/macaron/malware_analyzer/pypi_heuristics/metadata/similar_projects.py @@ -0,0 +1,233 @@ +# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +"""This analyzer checks if the package has a similar structure to other packages maintained by the same user.""" + +import hashlib +import logging +import tarfile +import typing + +import requests +from bs4 import BeautifulSoup + +from macaron.errors import HeuristicAnalyzerValueError +from macaron.json_tools import JsonType +from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer +from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics +from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset + +logger: logging.Logger = logging.getLogger(__name__) + + +class SimilarProjectAnalyzer(BaseHeuristicAnalyzer): + """Check whether the package has a similar structure to other packages maintained by the same user.""" + + def __init__(self) -> None: + super().__init__( + name="similar_project_analyzer", + heuristic=Heuristics.SIMILAR_PROJECTS, + depends_on=None, + ) + + def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]: + """Analyze the package. + + Parameters + ---------- + pypi_package_json: PyPIPackageJsonAsset + The PyPI package JSON asset object. + + Returns + ------- + tuple[HeuristicResult, dict[str, JsonType]]: + The result and related information collected during the analysis. + + Raises + ------ + HeuristicAnalyzerValueError + if the analysis fails. + """ + package_name = pypi_package_json.component_name + target_hash = self.get_structure_hash(package_name) + if target_hash is None: + return HeuristicResult.SKIP, { + "message": f"the package {package_name} does not have a sdist.", + } + + similar_packages = self.get_packages(package_name) + if not similar_packages: + return HeuristicResult.SKIP, { + "message": f"the maintainers of {package_name} do not maintain any other packages.", + } + + for package in similar_packages: + package_hash = self.get_structure_hash(package) + if package_hash is None: + logger.info("Package does not have a sdist.") + continue + if package_hash == target_hash: + return HeuristicResult.FAIL, { + "similar_package": package, + } + return HeuristicResult.PASS, {} + + def get_maintainers(self, package_name: str) -> list[str]: + """Get all maintainers of a package. + + Parameters + ---------- + package_name (str): The name of the package. + + Returns + ------- + list[str]: A list of maintainers. + """ + url = f"https://pypi.org/project/{package_name}/" + response = requests.get(url, timeout=10) + if response.status_code != 200: + return [] + + soup = BeautifulSoup(response.text, "html.parser") + gravatar_spans = soup.find_all("span", class_="sidebar-section__user-gravatar-text") + maintainers = [span.get_text().strip() for span in gravatar_spans] + + return maintainers + + def get_packages_by_user(self, username: str) -> list[str]: + """Get all packages by a user. + + Parameters + ---------- + username (str): The username of the user. + + Returns + ------- + list[str]: A list of package names. + """ + url = f"https://pypi.org/user/{username}/" + response = requests.get(url, timeout=10) + if response.status_code != 200: + return [] + + soup = BeautifulSoup(response.text, "html.parser") + headers = soup.find_all("h3", class_="package-snippet__title") + packages = [header.get_text().strip() for header in headers] + return packages + + def get_packages(self, package_name: str) -> list[str]: + """Get packages that are maintained by this package's maintainers. + + Parameters + ---------- + package_name (str): The name of the package. + + Returns + ------- + list[str]: A list of similar projects. + """ + similar_projects = [] + maintainers = self.get_maintainers(package_name) + for user in maintainers: + user_packages = self.get_packages_by_user(user) + similar_projects.extend(user_packages) + # Remove the target package from the list of similar projects. + similar_projects_set = set(similar_projects) + similar_projects_set.discard(package_name) + return list(similar_projects_set) + + def fetch_sdist_url(self, package_name: str, version: str | None = None) -> str: + """Fetch the sdist URL for a package. + + Parameters + ---------- + package_name (str): The name of the package. + version (str): The version of the package. If None, the latest version will be used. + + Returns + ------- + str: The sdist URL, or an empty string if not found. + """ + url = f"https://pypi.org/pypi/{package_name}/json" + try: + response = requests.get(url, timeout=10) + response.raise_for_status() + data = response.json() + except requests.exceptions.RequestException as err: + err_message = f"Failed to fetch PyPI JSON for {package_name}: {err}" + raise HeuristicAnalyzerValueError(err_message) from err + except ValueError as err: + err_message = f"Failed to decode PyPI JSON for {package_name}: {err}" + raise HeuristicAnalyzerValueError(err_message) from err + + actual_version: str + if version is None: + try: + actual_version = typing.cast(str, data["info"]["version"]) + except (KeyError, TypeError) as err: + err_message = f"Failed to get version for {package_name}: {err}" + raise HeuristicAnalyzerValueError(err_message) from err + else: + actual_version = version + + try: + for release_file in data.get("releases", {}).get(actual_version, []): + if isinstance(release_file, dict) and release_file.get("packagetype") == "sdist": + sdist_url = release_file.get("url") + if isinstance(sdist_url, str): + return sdist_url + except Exception as err: + err_message = f"Failed to parse releases for {package_name} version {actual_version}: {err}" + raise HeuristicAnalyzerValueError(err_message) from err + + return "" + + def get_structure_hash(self, package_name: str) -> str | None: + """Calculate a hash based on the project's file structure. + + Parameters + ---------- + package_name (str): The name of the package. + + Returns + ------- + str: The structure hash. + + Raises + ------ + ValueError: If the sdist URL cannot be fetched or the package structure cannot be hashed. + """ + sdist_url = self.fetch_sdist_url(package_name) + if not sdist_url: + return None + + try: + response = requests.get(sdist_url, stream=True, timeout=10) + response.raise_for_status() + raw_file_obj: typing.IO[bytes] = typing.cast(typing.IO[bytes], response.raw) + + with tarfile.open(fileobj=raw_file_obj, mode="r:gz") as file_archive: + paths = [] + for member in file_archive: + if not member.isdir(): + # remove top‑level dir. + parts = member.name.split("/", 1) + normalized = parts[1] if len(parts) > 1 else parts[0] + # replace the pkg name. + normalized = normalized.replace(package_name, "") + paths.append(normalized) + paths.sort() + structure_hash_calculator = hashlib.sha256() + for path in paths: + structure_hash_calculator.update(path.encode("utf-8")) + structure_hash_calculator.update(b"\n") + return structure_hash_calculator.hexdigest() + except requests.exceptions.RequestException as err: + err_message = f"Failed to download sdist for {package_name} from {sdist_url}: {err}" + raise HeuristicAnalyzerValueError(err_message) from err + except tarfile.TarError as err: + err_message = f"Failed to process tarfile for {package_name} from {sdist_url}: {err}" + raise HeuristicAnalyzerValueError(err_message) from err + except Exception as err: + err_message = f"Failed to get structure hash for {package_name}: {err}" + raise HeuristicAnalyzerValueError(err_message) from err diff --git a/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py b/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py index c69de3bde..ce2fa5f62 100644 --- a/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py +++ b/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py @@ -22,6 +22,7 @@ from macaron.malware_analyzer.pypi_heuristics.metadata.empty_project_link import EmptyProjectLinkAnalyzer from macaron.malware_analyzer.pypi_heuristics.metadata.high_release_frequency import HighReleaseFrequencyAnalyzer from macaron.malware_analyzer.pypi_heuristics.metadata.one_release import OneReleaseAnalyzer +from macaron.malware_analyzer.pypi_heuristics.metadata.similar_projects import SimilarProjectAnalyzer from macaron.malware_analyzer.pypi_heuristics.metadata.source_code_repo import SourceCodeRepoAnalyzer from macaron.malware_analyzer.pypi_heuristics.metadata.unchanged_release import UnchangedReleaseAnalyzer from macaron.malware_analyzer.pypi_heuristics.metadata.wheel_absence import WheelAbsenceAnalyzer @@ -332,6 +333,7 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData: SuspiciousSetupAnalyzer, WheelAbsenceAnalyzer, AnomalousVersionAnalyzer, + SimilarProjectAnalyzer, ] # name used to query the result of all problog rules, so it can be accessed outside the model. @@ -381,6 +383,10 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData: failed({Heuristics.CLOSER_RELEASE_JOIN_DATE.value}), forceSetup. + % Package released that is similar to other packages maintained by the same maintainer. + {Confidence.HIGH.value}::trigger(malware_high_confidence_4) :- + quickUndetailed, forceSetup, failed({Heuristics.SIMILAR_PROJECTS.value}). + % Package released recently with little detail, with multiple releases as a trust marker, but frequent and with % the same code. {Confidence.MEDIUM.value}::trigger(malware_medium_confidence_1) :- @@ -401,6 +407,7 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData: {problog_result_access} :- trigger(malware_high_confidence_1). {problog_result_access} :- trigger(malware_high_confidence_2). {problog_result_access} :- trigger(malware_high_confidence_3). + {problog_result_access} :- trigger(malware_high_confidence_4). {problog_result_access} :- trigger(malware_medium_confidence_2). {problog_result_access} :- trigger(malware_medium_confidence_1). query({problog_result_access}). diff --git a/tests/malware_analyzer/pypi/test_similar_projects.py b/tests/malware_analyzer/pypi/test_similar_projects.py new file mode 100644 index 000000000..f95a0598f --- /dev/null +++ b/tests/malware_analyzer/pypi/test_similar_projects.py @@ -0,0 +1,202 @@ +# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +"""Tests for the SimilarProjectAnalyzer heuristic.""" +# pylint: disable=redefined-outer-name + +from unittest.mock import MagicMock, patch + +import pytest + +from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult +from macaron.malware_analyzer.pypi_heuristics.metadata.similar_projects import SimilarProjectAnalyzer + + +@pytest.fixture() +def analyzer() -> SimilarProjectAnalyzer: + """Pytest fixture to create a SimilarProjectAnalyzer instance.""" + analyzer_instance = SimilarProjectAnalyzer() + return analyzer_instance + + +def test_analyze_skip_no_similar_packages(analyzer: SimilarProjectAnalyzer, pypi_package_json: MagicMock) -> None: + """Test the analyzer skips when the maintainers of the package do not maintain any other packages.""" + pypi_package_json.component_name = "test_package" + with ( + patch( + "macaron.malware_analyzer.pypi_heuristics.metadata.similar_projects.SimilarProjectAnalyzer.get_structure_hash" + ) as mock_get_structure_hash, + patch( + "macaron.malware_analyzer.pypi_heuristics.metadata.similar_projects.SimilarProjectAnalyzer.get_packages" + ) as mock_get_packages, + ): + mock_get_structure_hash.return_value = "dummy_hash" + mock_get_packages.return_value = [] + result, info = analyzer.analyze(pypi_package_json) + assert result == HeuristicResult.SKIP + assert "the maintainers of test_package do not maintain any other packages." in str(info["message"]) + + +def test_analyze_fail_similar_project_found(analyzer: SimilarProjectAnalyzer, pypi_package_json: MagicMock) -> None: + """Test the analyzer fails when a similar project with the same structure hash is found.""" + pypi_package_json.component_name = "test_package" + with ( + patch( + "macaron.malware_analyzer.pypi_heuristics.metadata.similar_projects.SimilarProjectAnalyzer.get_structure_hash" + ) as mock_get_structure_hash, + patch( + "macaron.malware_analyzer.pypi_heuristics.metadata.similar_projects.SimilarProjectAnalyzer.get_packages" + ) as mock_get_packages, + ): + mock_get_structure_hash.return_value = "same_hash" + mock_get_packages.return_value = ["similar_package"] + result, info = analyzer.analyze(pypi_package_json) + assert result == HeuristicResult.FAIL + assert info["similar_package"] == "similar_package" + + +def test_analyze_pass_no_similar_hash(analyzer: SimilarProjectAnalyzer, pypi_package_json: MagicMock) -> None: + """Test the analyzer passes when no similar project has the same structure hash.""" + pypi_package_json.component_name = "test_package" + with ( + patch( + "macaron.malware_analyzer.pypi_heuristics.metadata.similar_projects.SimilarProjectAnalyzer.get_structure_hash" + ) as mock_get_structure_hash, + patch( + "macaron.malware_analyzer.pypi_heuristics.metadata.similar_projects.SimilarProjectAnalyzer.get_packages" + ) as mock_get_packages, + ): + mock_get_structure_hash.side_effect = ["mock_hash1", "mock_hash2"] + mock_get_packages.return_value = ["similar_package"] + result, info = analyzer.analyze(pypi_package_json) + assert result == HeuristicResult.PASS + assert info == {} + + +def test_get_maintainers_failure(analyzer: SimilarProjectAnalyzer) -> None: + """Test get_maintainers method with a failed response.""" + maintainers = analyzer.get_maintainers("test_package") + assert maintainers == [] + + +def test_get_packages_by_user_failure(analyzer: SimilarProjectAnalyzer) -> None: + """Test get_packages_by_user method with a failed response.""" + packages = analyzer.get_packages_by_user("test_user") + assert packages == [] + + +def test_get_packages(analyzer: SimilarProjectAnalyzer) -> None: + """Test get_packages method.""" + with ( + patch( + "macaron.malware_analyzer.pypi_heuristics.metadata.similar_projects.SimilarProjectAnalyzer.get_maintainers" + ) as mock_get_maintainers, + patch( + "macaron.malware_analyzer.pypi_heuristics.metadata.similar_projects.SimilarProjectAnalyzer.get_packages_by_user" + ) as mock_get_packages_by_user, + ): + mock_get_maintainers.return_value = ["user1", "user2"] + mock_get_packages_by_user.side_effect = [["package1", "package2"], ["package2", "package3"]] + packages = analyzer.get_packages("test_package") + assert set(packages) == {"package1", "package2", "package3"} + + +def test_get_packages_excludes_self(analyzer: SimilarProjectAnalyzer) -> None: + """Test get_packages method excludes the package itself.""" + with ( + patch( + "macaron.malware_analyzer.pypi_heuristics.metadata.similar_projects.SimilarProjectAnalyzer.get_maintainers" + ) as mock_get_maintainers, + patch( + "macaron.malware_analyzer.pypi_heuristics.metadata.similar_projects.SimilarProjectAnalyzer.get_packages_by_user" + ) as mock_get_packages_by_user, + ): + mock_get_maintainers.return_value = ["user1"] + mock_get_packages_by_user.return_value = ["test_package", "package1"] + packages = analyzer.get_packages("test_package") + assert packages == ["package1"] + + +def test_fetch_sdist_url_success(analyzer: SimilarProjectAnalyzer) -> None: + """Test fetch_sdist_url method with a successful response.""" + with patch("requests.get") as mock_get: + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "info": {"version": "1.0.0"}, + "releases": {"1.0.0": [{"packagetype": "sdist", "url": "http://example.com/sdist.tar.gz"}]}, + } + mock_get.return_value = mock_response + url = analyzer.fetch_sdist_url("test_package") + assert url == "http://example.com/sdist.tar.gz" + + +def test_fetch_sdist_url_no_sdist(analyzer: SimilarProjectAnalyzer) -> None: + """Test fetch_sdist_url method when no sdist is found.""" + with patch("requests.get") as mock_get: + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "info": {"version": "1.0.0"}, + "releases": {"1.0.0": [{"packagetype": "bdist", "url": "http://example.com/bdist.whl"}]}, + } + mock_get.return_value = mock_response + url = analyzer.fetch_sdist_url("test_package") + assert url == "" + + +def test_fetch_sdist_url_version_specified(analyzer: SimilarProjectAnalyzer) -> None: + """Test fetch_sdist_url method with a specific version.""" + with patch("requests.get") as mock_get: + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "info": {"version": "1.0.0"}, + "releases": { + "0.9.0": [{"packagetype": "sdist", "url": "http://example.com/sdist_0.9.tar.gz"}], + "1.0.0": [{"packagetype": "sdist", "url": "http://example.com/sdist.tar.gz"}], + }, + } + mock_get.return_value = mock_response + url = analyzer.fetch_sdist_url("test_package", version="0.9.0") + assert url == "http://example.com/sdist_0.9.tar.gz" + + +def test_get_structure_hash(analyzer: SimilarProjectAnalyzer) -> None: + """Test get_structure_hash method.""" + with ( + patch( + "macaron.malware_analyzer.pypi_heuristics.metadata.similar_projects.SimilarProjectAnalyzer.fetch_sdist_url" + ) as mock_fetch_sdist_url, + patch("requests.get") as mock_get, + patch("tarfile.open") as mock_tarfile_open, + ): + + mock_fetch_sdist_url.return_value = "http://example.com/sdist.tar.gz" + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.raw = MagicMock() + mock_get.return_value = mock_response + + mock_tarfile = MagicMock() + mock_tarfile_open.return_value = mock_tarfile + + member1 = MagicMock() + member1.name = "test_package-1.0.0/file1.py" + member1.isdir.return_value = False + + member2 = MagicMock() + member2.name = "test_package-1.0.0/dir/file2.py" + member2.isdir.return_value = False + + member3 = MagicMock() + member3.name = "test_package-1.0.0/dir" + member3.isdir.return_value = True + + mock_tarfile.__iter__.return_value = [member1, member2, member3] + mock_tarfile.close.return_value = None + + structure_hash = analyzer.get_structure_hash("test_package") + + assert isinstance(structure_hash, str) + assert len(structure_hash) == 64 # SHA256 hexdigest length. diff --git a/tests/slsa_analyzer/checks/test_detect_malicious_metadata_check.py b/tests/slsa_analyzer/checks/test_detect_malicious_metadata_check.py index 15caf3249..689fb12e5 100644 --- a/tests/slsa_analyzer/checks/test_detect_malicious_metadata_check.py +++ b/tests/slsa_analyzer/checks/test_detect_malicious_metadata_check.py @@ -29,7 +29,7 @@ # heuristic, a false negative has been introduced. Note that if the unit test were allowed to access the OSV # knowledge base, it would report the package as malware. However, we intentionally block unit tests # from reaching the network. - ("pkg:pypi/zlibxjson", CheckResultType.PASSED), + ("pkg:pypi/zlibxjson", CheckResultType.UNKNOWN), ("pkg:pypi/test", CheckResultType.UNKNOWN), ("pkg:maven:test/test", CheckResultType.UNKNOWN), ],