|
| 1 | +# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved. |
| 2 | +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. |
| 3 | + |
| 4 | +"""The heuristic analyzer to check the email address of the package maintainers.""" |
| 5 | + |
| 6 | +import logging |
| 7 | +import re |
| 8 | + |
| 9 | +from email_validator import EmailNotValidError, ValidatedEmail, validate_email |
| 10 | + |
| 11 | +from macaron.config.defaults import defaults |
| 12 | +from macaron.errors import HeuristicAnalyzerValueError |
| 13 | +from macaron.json_tools import JsonType, json_extract |
| 14 | +from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer |
| 15 | +from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics |
| 16 | +from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset |
| 17 | + |
| 18 | +logger: logging.Logger = logging.getLogger(__name__) |
| 19 | + |
| 20 | + |
| 21 | +class FakeEmailAnalyzer(BaseHeuristicAnalyzer): |
| 22 | + """Analyze the email address of the package maintainers.""" |
| 23 | + |
| 24 | + PATTERN = re.compile( |
| 25 | + r"""\b # word‑boundary |
| 26 | + [A-Za-z0-9]+ # first alpha‑numeric segment |
| 27 | + (?:\.[A-Za-z0-9]+)* # optional “.segment” repeats |
| 28 | + @ |
| 29 | + [A-Za-z0-9]+ # domain name segment |
| 30 | + (?:\.[A-Za-z0-9]+)* # optional sub‑domains |
| 31 | + \.[A-Za-z]{2,} # top‑level domain (at least 2 letters) |
| 32 | + \b""", |
| 33 | + re.VERBOSE, |
| 34 | + ) |
| 35 | + |
| 36 | + def __init__(self) -> None: |
| 37 | + super().__init__( |
| 38 | + name="fake_email_analyzer", |
| 39 | + heuristic=Heuristics.FAKE_EMAIL, |
| 40 | + depends_on=None, |
| 41 | + ) |
| 42 | + self.check_deliverability: bool = self._load_defaults() |
| 43 | + |
| 44 | + def _load_defaults(self) -> bool: |
| 45 | + """Load the default values from defaults.ini.""" |
| 46 | + section_name = "heuristic.pypi" |
| 47 | + if defaults.has_section(section_name): |
| 48 | + section = defaults[section_name] |
| 49 | + return section.getboolean("check_deliverability", fallback=True) |
| 50 | + return True |
| 51 | + |
| 52 | + def get_emails(self, email_field: str) -> list[str]: |
| 53 | + """Extract emails from the given email field. |
| 54 | +
|
| 55 | + Parameters |
| 56 | + ---------- |
| 57 | + email_field: str |
| 58 | + The email field from which to extract emails. |
| 59 | +
|
| 60 | + Returns |
| 61 | + ------- |
| 62 | + list[str] |
| 63 | + A list of emails extracted from the email field. |
| 64 | + """ |
| 65 | + emails = self.PATTERN.findall(email_field) |
| 66 | + return [email.strip() for email in emails if email.strip()] |
| 67 | + |
| 68 | + def is_valid_email(self, email: str) -> ValidatedEmail | None: |
| 69 | + """Check if the email format is valid and the domain has MX records. |
| 70 | +
|
| 71 | + Parameters |
| 72 | + ---------- |
| 73 | + email: str |
| 74 | + The email address to check. |
| 75 | +
|
| 76 | + Returns |
| 77 | + ------- |
| 78 | + ValidatedEmail | None |
| 79 | + The validated email object if the email is valid, otherwise None. |
| 80 | + """ |
| 81 | + emailinfo = None |
| 82 | + try: |
| 83 | + emailinfo = validate_email(email, check_deliverability=self.check_deliverability) |
| 84 | + except EmailNotValidError as err: |
| 85 | + err_message = f"Invalid email address: {email}. Error: {err}" |
| 86 | + logger.warning(err_message) |
| 87 | + return emailinfo |
| 88 | + |
| 89 | + def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]: |
| 90 | + """Analyze the package. |
| 91 | +
|
| 92 | + Parameters |
| 93 | + ---------- |
| 94 | + pypi_package_json: PyPIPackageJsonAsset |
| 95 | + The PyPI package JSON asset object. |
| 96 | +
|
| 97 | + Returns |
| 98 | + ------- |
| 99 | + tuple[HeuristicResult, dict[str, JsonType]]: |
| 100 | + The result and related information collected during the analysis. |
| 101 | + """ |
| 102 | + package_json = pypi_package_json.package_json |
| 103 | + if not package_json.get("info", {}): |
| 104 | + raise HeuristicAnalyzerValueError("No package info available.") |
| 105 | + |
| 106 | + author_email = json_extract(package_json, ["info", "author_email"], str) |
| 107 | + maintainer_email = json_extract(package_json, ["info", "maintainer_email"], str) |
| 108 | + |
| 109 | + if not author_email and not maintainer_email: |
| 110 | + return HeuristicResult.SKIP, {"message": "No author or maintainer email available."} |
| 111 | + |
| 112 | + validated_emails: list[JsonType] = [] |
| 113 | + details = ["normalized", "local_part", "domain"] |
| 114 | + |
| 115 | + for email_field in [author_email, maintainer_email]: |
| 116 | + if email_field: |
| 117 | + emails = self.get_emails(email_field) |
| 118 | + if not emails: |
| 119 | + return HeuristicResult.FAIL, {"message": "no emails found in the email field"} |
| 120 | + |
| 121 | + for email in emails: |
| 122 | + email_info = self.is_valid_email(email) |
| 123 | + if not email_info: |
| 124 | + return HeuristicResult.FAIL, {"invalid_email": email} |
| 125 | + |
| 126 | + validated_emails.append({key: getattr(email_info, key) for key in details}) |
| 127 | + |
| 128 | + return HeuristicResult.PASS, {"validated_emails": validated_emails} |
0 commit comments