Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion vanir/detector_common_flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from absl import flags
import dateutil.relativedelta
from vanir import vulnerability
from vanir import vulnerability_manager
from vanir import vulnerability_overwriter
from vanir.scanners import scanner_base
Expand Down Expand Up @@ -144,6 +145,14 @@
'(subjected to other flags). This flag can be specified multiple times.'
)

_IGNORE_OTHER_PACKAGE_VERSIONS = flags.DEFINE_bool(
'ignore_other_package_versions', False,
'If True, only signatures for the versions specified under package_version'
'are used. This includes all version-specific signatures and signatures'
'listed under an affected entry with that version. In case no signatures'
'exist for these versions, all signatures are used instead.'
)

_OVERWRITE_SPECS = flags.DEFINE_string(
'overwrite_specs',
None,
Expand Down Expand Up @@ -330,6 +339,7 @@ def generate_vuln_manager_from_flags(


def generate_finding_filters_from_flags(
vulnerabilities: Sequence[vulnerability.Vulnerability]
) -> Sequence[scanner_base.FindingsFilter]:
"""Parses flags related to finding filters and return the list of filters."""
filters = []
Expand All @@ -338,5 +348,16 @@ def generate_finding_filters_from_flags(
scanner_base.PathPrefixFilter(path) for path in _IGNORE_SCAN_PATHS.value
)
versions = _PACKAGE_VERSIONS.value if _PACKAGE_VERSIONS.value else []
filters.append(scanner_base.PackageVersionSpecificSignatureFilter(versions))
if not versions and _IGNORE_OTHER_PACKAGE_VERSIONS.value:
raise ValueError(
'No versions specified in "package-versions",'
'although ignore_other_package_versions is set to true.'
)
filters.append(
scanner_base.PackageVersionSpecificSignatureFilter(
versions,
_IGNORE_OTHER_PACKAGE_VERSIONS.value,
vulnerabilities,
)
)
return filters
4 changes: 2 additions & 2 deletions vanir/detector_common_flags_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ def test_generate_vulnerability_filters_from_flags_ignores_low_severity(self):

@flagsaver.flagsaver(ignore_scan_path=['path1', 'path2/3'])
def test_generate_scan_path_finding_filters_from_flags(self):
filters = detector_common_flags.generate_finding_filters_from_flags()
filters = detector_common_flags.generate_finding_filters_from_flags([])
self.assertLen(filters, 3)
self.assertIsInstance(filters[0], scanner_base.PathPrefixFilter)
self.assertIsInstance(filters[1], scanner_base.PathPrefixFilter)
Expand All @@ -238,7 +238,7 @@ def test_generate_scan_path_finding_filters_from_flags(self):

@flagsaver.flagsaver(package_version=['1', '2'])
def test_generate_version_finding_filters_from_flags(self):
filters = detector_common_flags.generate_finding_filters_from_flags()
filters = detector_common_flags.generate_finding_filters_from_flags([])
self.assertLen(filters, 1)
self.assertIsInstance(
filters[0], scanner_base.PackageVersionSpecificSignatureFilter
Expand Down
4 changes: 3 additions & 1 deletion vanir/detector_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,9 @@ def main(argv: Sequence[str]) -> None:
)
finding_filters = (
[scanner_base.ShortFunctionFilter()]
+ list(detector_common_flags.generate_finding_filters_from_flags())
+ list(detector_common_flags.generate_finding_filters_from_flags(
vuln_manager.get_vulnerabilities())
)
)
findings = scanner_base.ShortFunctionFilter().filter(findings)
for finding_filter in finding_filters:
Expand Down
78 changes: 59 additions & 19 deletions vanir/scanners/scanner_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,34 +148,74 @@ def filter(self, findings: Findings) -> Findings:
class PackageVersionSpecificSignatureFilter(FindingsFilter):
"""Removes findings from version-specific signatures not matching given versions."""

def __init__(self, versions: Collection[str]):
def __init__(
self,
versions: Collection[str],
ignore_other_package_versions: bool=False,
vulnerabilities: Sequence[vulnerability.Vulnerability]=[]
):
self._package_versions = frozenset(versions)
self._ignore_other_package_versions = ignore_other_package_versions
self._versions_per_CVE = self._get_package_versions_per_CVE(vulnerabilities)

def filter(self, findings: Findings) -> Findings:
filtered_findings = {}
for sig, chunks in findings.items():
# If the signature is not version-specific, keep.
if not sig.match_only_versions:
filtered_findings[sig] = chunks
continue
# If the signature's versions overlay with the package's versions, keep.
if set(sig.match_only_versions) & self._package_versions:
filtered_findings[sig] = chunks
continue
# If the signature has "X-next" listed and the package's version is newer
# than X, keep. Note that this versioning scheme is currently only used by
# Android; there are plans for a more generic approach in the future.
next_vers = [v for v in sig.match_only_versions if v.endswith('-next')]
# We are using string comparison for versioning; there is plan to
# incorporate OSV's SemVer comparison library in the future.
CVE_id = sig.signature_id.removesuffix('-' + sig.signature_id.split('-')[-1])

# Make sure there is at least one patch for the specified versions
# in case we want to ignore other package versions
if (
next_vers and
any(ver > min(next_vers) for ver in self._package_versions)
self._ignore_other_package_versions
and (self._versions_per_CVE[CVE_id] & self._package_versions)
):
filtered_findings[sig] = chunks
# Otherwise, filter out.
# Only keep signatures for a CVE that are in self._package_versions
if(
(set(sig.affected_entry_versions) & self._package_versions) or
(sig.match_only_versions and (set(sig.match_only_versions) & self._package_versions))
):
filtered_findings[sig] = chunks
else:
logging.debug(f'Ignoring {sig} due to version mismatch')
else:
# If the signature is not version-specific, keep.
if not sig.match_only_versions:
filtered_findings[sig] = chunks
continue
# If the signature's versions overlay with the package's versions, keep.
if set(sig.match_only_versions) & self._package_versions:
filtered_findings[sig] = chunks
continue
# If the signature has "X-next" listed and the package's version is newer
# than X, keep. Note that this versioning scheme is currently only used by
# Android; there are plans for a more generic approach in the future.
next_vers = [v for v in sig.match_only_versions if v.endswith('-next')]
# We are using string comparison for versioning; there is plan to
# incorporate OSV's SemVer comparison library in the future.
if (
next_vers and
any(ver > min(next_vers) for ver in self._package_versions)
):
filtered_findings[sig] = chunks
# Otherwise, filter out.
return filtered_findings

def _get_package_versions_per_CVE(
self,
vulnerabilities: Sequence[vulnerability.Vulnerability]
) -> Mapping[str, str]:
"""Parse package version for all vulnerability affected entries."""
mappings = {}
for vuln in vulnerabilities:
ids = [vuln.id]
if vuln.aliases:
ids.extend(vuln.aliases)
for vuln_id in ids:
mappings[vuln_id] = set()
for affected_entry in vuln.affected:
for vuln_id in ids:
mappings[vuln_id].update(affected_entry.versions)
return mappings

@dataclasses.dataclass(frozen=True)
class ScannedFileStats:
Expand Down
12 changes: 12 additions & 0 deletions vanir/signature.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ class Signature(metaclass=abc.ABCMeta):
to identify the target file. See the Truncated Path module for details.
signature_id_prefix: Prepended to the signature hash to create the globally
unique ID of the signature. If not given, signature_id will be invalid.
affected_entry_versions: Versions of the affected entry of the signature
"""
signature_id: str
signature_version: str
Expand All @@ -158,6 +159,7 @@ class Signature(metaclass=abc.ABCMeta):
exact_target_file_match_only: bool
match_only_versions: Optional[FrozenSet[str]]
truncated_path_level: Optional[int]
affected_entry_versions: Optional[Sequence[str]]

@property
@abc.abstractmethod
Expand Down Expand Up @@ -222,6 +224,11 @@ def from_osv_dict(cls, osv_dict: dict[str, Any]) -> Self:
function_hash=int(osv_dict['digest']['function_hash']),
length=int(osv_dict['digest']['length']),
target_function=osv_dict['target']['function'],
affected_entry_versions=(
frozenset(osv_dict['affected_entry_versions'])
if 'affected_entry_versions' in osv_dict
else None
),
)
elif sig_type is SignatureType.LINE_SIGNATURE:
sign = LineSignature(
Expand All @@ -241,6 +248,11 @@ def from_osv_dict(cls, osv_dict: dict[str, Any]) -> Self:
truncated_path_level=_get_truncated_path_level(osv_dict),
line_hashes=[int(h) for h in osv_dict['digest']['line_hashes']],
threshold=osv_dict['digest']['threshold'],
affected_entry_versions=(
frozenset(osv_dict['affected_entry_versions'])
if 'affected_entry_versions' in osv_dict
else None
),
)
else:
raise ValueError(f'Signature type {sig_type} is unknown.')
Expand Down
2 changes: 2 additions & 0 deletions vanir/vulnerability.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ def __init__(
original_signatures = self.ecosystem_specific[OSV_VANIR_SIGNATURES]
else:
original_signatures = []
for sig in original_signatures:
sig["affected_entry_versions"] = self.versions
self.vanir_signatures = [
sig if isinstance(sig, signature.Signature) else
signature.Signature.from_osv_dict(sig)
Expand Down