diff --git a/vanir/detector_common_flags.py b/vanir/detector_common_flags.py index 8d3f32c..301daa4 100644 --- a/vanir/detector_common_flags.py +++ b/vanir/detector_common_flags.py @@ -19,6 +19,7 @@ from absl import flags import dateutil.relativedelta +from vanir import vulnerability from vanir import vulnerability_manager from vanir import vulnerability_overwriter from vanir.scanners import scanner_base @@ -144,6 +145,14 @@ '(subjected to other flags). This flag can be specified multiple times.' ) +_IGNORE_OTHER_PACKAGE_VERSIONS = flags.DEFINE_bool( + 'ignore_other_package_versions', False, + 'If True, only signatures for the versions specified under package_version' + 'are used. This includes all version-specific signatures and signatures' + 'listed under an affected entry with that version. In case no signatures' + 'exist for these versions, all signatures are used instead.' +) + _OVERWRITE_SPECS = flags.DEFINE_string( 'overwrite_specs', None, @@ -330,6 +339,7 @@ def generate_vuln_manager_from_flags( def generate_finding_filters_from_flags( + vulnerabilities: Sequence[vulnerability.Vulnerability] ) -> Sequence[scanner_base.FindingsFilter]: """Parses flags related to finding filters and return the list of filters.""" filters = [] @@ -338,5 +348,16 @@ def generate_finding_filters_from_flags( scanner_base.PathPrefixFilter(path) for path in _IGNORE_SCAN_PATHS.value ) versions = _PACKAGE_VERSIONS.value if _PACKAGE_VERSIONS.value else [] - filters.append(scanner_base.PackageVersionSpecificSignatureFilter(versions)) + if not versions and _IGNORE_OTHER_PACKAGE_VERSIONS.value: + raise ValueError( + 'No versions specified in "package-versions",' + 'although ignore_other_package_versions is set to true.' + ) + filters.append( + scanner_base.PackageVersionSpecificSignatureFilter( + versions, + _IGNORE_OTHER_PACKAGE_VERSIONS.value, + vulnerabilities, + ) + ) return filters diff --git a/vanir/detector_common_flags_test.py b/vanir/detector_common_flags_test.py index cab8113..1a89377 100644 --- a/vanir/detector_common_flags_test.py +++ b/vanir/detector_common_flags_test.py @@ -228,7 +228,7 @@ def test_generate_vulnerability_filters_from_flags_ignores_low_severity(self): @flagsaver.flagsaver(ignore_scan_path=['path1', 'path2/3']) def test_generate_scan_path_finding_filters_from_flags(self): - filters = detector_common_flags.generate_finding_filters_from_flags() + filters = detector_common_flags.generate_finding_filters_from_flags([]) self.assertLen(filters, 3) self.assertIsInstance(filters[0], scanner_base.PathPrefixFilter) self.assertIsInstance(filters[1], scanner_base.PathPrefixFilter) @@ -238,7 +238,7 @@ def test_generate_scan_path_finding_filters_from_flags(self): @flagsaver.flagsaver(package_version=['1', '2']) def test_generate_version_finding_filters_from_flags(self): - filters = detector_common_flags.generate_finding_filters_from_flags() + filters = detector_common_flags.generate_finding_filters_from_flags([]) self.assertLen(filters, 1) self.assertIsInstance( filters[0], scanner_base.PackageVersionSpecificSignatureFilter diff --git a/vanir/detector_runner.py b/vanir/detector_runner.py index 7710525..00e1ff6 100644 --- a/vanir/detector_runner.py +++ b/vanir/detector_runner.py @@ -528,7 +528,9 @@ def main(argv: Sequence[str]) -> None: ) finding_filters = ( [scanner_base.ShortFunctionFilter()] - + list(detector_common_flags.generate_finding_filters_from_flags()) + + list(detector_common_flags.generate_finding_filters_from_flags( + vuln_manager.get_vulnerabilities()) + ) ) findings = scanner_base.ShortFunctionFilter().filter(findings) for finding_filter in finding_filters: diff --git a/vanir/scanners/scanner_base.py b/vanir/scanners/scanner_base.py index 008628a..3d97ef0 100644 --- a/vanir/scanners/scanner_base.py +++ b/vanir/scanners/scanner_base.py @@ -148,34 +148,74 @@ def filter(self, findings: Findings) -> Findings: class PackageVersionSpecificSignatureFilter(FindingsFilter): """Removes findings from version-specific signatures not matching given versions.""" - def __init__(self, versions: Collection[str]): + def __init__( + self, + versions: Collection[str], + ignore_other_package_versions: bool=False, + vulnerabilities: Sequence[vulnerability.Vulnerability]=[] + ): self._package_versions = frozenset(versions) + self._ignore_other_package_versions = ignore_other_package_versions + self._versions_per_CVE = self._get_package_versions_per_CVE(vulnerabilities) def filter(self, findings: Findings) -> Findings: filtered_findings = {} for sig, chunks in findings.items(): - # If the signature is not version-specific, keep. - if not sig.match_only_versions: - filtered_findings[sig] = chunks - continue - # If the signature's versions overlay with the package's versions, keep. - if set(sig.match_only_versions) & self._package_versions: - filtered_findings[sig] = chunks - continue - # If the signature has "X-next" listed and the package's version is newer - # than X, keep. Note that this versioning scheme is currently only used by - # Android; there are plans for a more generic approach in the future. - next_vers = [v for v in sig.match_only_versions if v.endswith('-next')] - # We are using string comparison for versioning; there is plan to - # incorporate OSV's SemVer comparison library in the future. + CVE_id = sig.signature_id.removesuffix('-' + sig.signature_id.split('-')[-1]) + + # Make sure there is at least one patch for the specified versions + # in case we want to ignore other package versions if ( - next_vers and - any(ver > min(next_vers) for ver in self._package_versions) + self._ignore_other_package_versions + and (self._versions_per_CVE[CVE_id] & self._package_versions) ): - filtered_findings[sig] = chunks - # Otherwise, filter out. + # Only keep signatures for a CVE that are in self._package_versions + if( + (set(sig.affected_entry_versions) & self._package_versions) or + (sig.match_only_versions and (set(sig.match_only_versions) & self._package_versions)) + ): + filtered_findings[sig] = chunks + else: + logging.debug(f'Ignoring {sig} due to version mismatch') + else: + # If the signature is not version-specific, keep. + if not sig.match_only_versions: + filtered_findings[sig] = chunks + continue + # If the signature's versions overlay with the package's versions, keep. + if set(sig.match_only_versions) & self._package_versions: + filtered_findings[sig] = chunks + continue + # If the signature has "X-next" listed and the package's version is newer + # than X, keep. Note that this versioning scheme is currently only used by + # Android; there are plans for a more generic approach in the future. + next_vers = [v for v in sig.match_only_versions if v.endswith('-next')] + # We are using string comparison for versioning; there is plan to + # incorporate OSV's SemVer comparison library in the future. + if ( + next_vers and + any(ver > min(next_vers) for ver in self._package_versions) + ): + filtered_findings[sig] = chunks + # Otherwise, filter out. return filtered_findings + def _get_package_versions_per_CVE( + self, + vulnerabilities: Sequence[vulnerability.Vulnerability] + ) -> Mapping[str, str]: + """Parse package version for all vulnerability affected entries.""" + mappings = {} + for vuln in vulnerabilities: + ids = [vuln.id] + if vuln.aliases: + ids.extend(vuln.aliases) + for vuln_id in ids: + mappings[vuln_id] = set() + for affected_entry in vuln.affected: + for vuln_id in ids: + mappings[vuln_id].update(affected_entry.versions) + return mappings @dataclasses.dataclass(frozen=True) class ScannedFileStats: diff --git a/vanir/signature.py b/vanir/signature.py index 684dac7..9509a41 100644 --- a/vanir/signature.py +++ b/vanir/signature.py @@ -149,6 +149,7 @@ class Signature(metaclass=abc.ABCMeta): to identify the target file. See the Truncated Path module for details. signature_id_prefix: Prepended to the signature hash to create the globally unique ID of the signature. If not given, signature_id will be invalid. + affected_entry_versions: Versions of the affected entry of the signature """ signature_id: str signature_version: str @@ -158,6 +159,7 @@ class Signature(metaclass=abc.ABCMeta): exact_target_file_match_only: bool match_only_versions: Optional[FrozenSet[str]] truncated_path_level: Optional[int] + affected_entry_versions: Optional[Sequence[str]] @property @abc.abstractmethod @@ -222,6 +224,11 @@ def from_osv_dict(cls, osv_dict: dict[str, Any]) -> Self: function_hash=int(osv_dict['digest']['function_hash']), length=int(osv_dict['digest']['length']), target_function=osv_dict['target']['function'], + affected_entry_versions=( + frozenset(osv_dict['affected_entry_versions']) + if 'affected_entry_versions' in osv_dict + else None + ), ) elif sig_type is SignatureType.LINE_SIGNATURE: sign = LineSignature( @@ -241,6 +248,11 @@ def from_osv_dict(cls, osv_dict: dict[str, Any]) -> Self: truncated_path_level=_get_truncated_path_level(osv_dict), line_hashes=[int(h) for h in osv_dict['digest']['line_hashes']], threshold=osv_dict['digest']['threshold'], + affected_entry_versions=( + frozenset(osv_dict['affected_entry_versions']) + if 'affected_entry_versions' in osv_dict + else None + ), ) else: raise ValueError(f'Signature type {sig_type} is unknown.') diff --git a/vanir/vulnerability.py b/vanir/vulnerability.py index eed28e4..aac5934 100644 --- a/vanir/vulnerability.py +++ b/vanir/vulnerability.py @@ -87,6 +87,8 @@ def __init__( original_signatures = self.ecosystem_specific[OSV_VANIR_SIGNATURES] else: original_signatures = [] + for sig in original_signatures: + sig["affected_entry_versions"] = self.versions self.vanir_signatures = [ sig if isinstance(sig, signature.Signature) else signature.Signature.from_osv_dict(sig)