From c8c07c2e3ddcab15eed1f11dbee204fd55b4763b Mon Sep 17 00:00:00 2001 From: Thomas Patzke Date: Tue, 14 Apr 2026 09:36:49 +0200 Subject: [PATCH] Added --url parameter to "pysigma update-cache" subcommand --- sigma/cli/pysigma.py | 102 ++++++++++++++++++++++++-------------- tests/test_pysigma.py | 112 ++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 173 insertions(+), 41 deletions(-) diff --git a/sigma/cli/pysigma.py b/sigma/cli/pysigma.py index 35c3554..9e77050 100644 --- a/sigma/cli/pysigma.py +++ b/sigma/cli/pysigma.py @@ -7,6 +7,52 @@ from packaging.specifiers import SpecifierSet from prettytable import PrettyTable + +def get_cache_datasets(): + from sigma.data import mitre_attack, mitre_d3fend + + return [ + { + 'key': 'mitre_attack', + 'name': 'MITRE ATT&CK', + 'module': mitre_attack, + 'cache_key': 'mitre_attack_data_default', + 'version_key': 'mitre_attack_version', + 'trigger_attr': 'mitre_attack_techniques_tactics_mapping' + }, + { + 'key': 'mitre_d3fend', + 'name': 'MITRE D3FEND', + 'module': mitre_d3fend, + 'cache_key': 'mitre_d3fend_data_default', + 'version_key': 'mitre_d3fend_version', + 'trigger_attr': 'mitre_d3fend_techniques' + } + ] + + +def parse_dataset_url_overrides(urls, datasets): + dataset_names = {dataset['key'] for dataset in datasets} + overrides = {} + + for url_override in urls: + dataset_name, separator, dataset_url = url_override.partition(":") + if not separator or not dataset_name or not dataset_url: + raise click.BadParameter( + "must use the format dataset:url", + param_hint="--url", + ) + + if dataset_name not in dataset_names: + raise click.BadParameter( + f"unknown dataset '{dataset_name}'. Available datasets: {', '.join(sorted(dataset_names))}", + param_hint="--url", + ) + + overrides[dataset_name] = dataset_url + + return overrides + def get_pysigma_requirement(): requires = importlib.metadata.requires("sigma-cli") return [ @@ -81,23 +127,7 @@ def check_pysigma(quiet=False): def list_cache_command(): """List the cached versions of pySigma data and their timestamps.""" try: - from sigma.data import mitre_attack, mitre_d3fend - - # Configuration for datasets to check - datasets = [ - { - 'name': 'MITRE ATT&CK', - 'module': mitre_attack, - 'cache_key': 'mitre_attack_data_default', - 'version_key': 'mitre_attack_version' - }, - { - 'name': 'MITRE D3FEND', - 'module': mitre_d3fend, - 'cache_key': 'mitre_d3fend_data_default', - 'version_key': 'mitre_d3fend_version' - } - ] + datasets = get_cache_datasets() table = PrettyTable() table.field_names = ["Dataset", "Version", "Cached Date"] @@ -146,12 +176,7 @@ def list_cache_command(): def clear_cache_command(yes): """Delete the cached data for all datasets.""" try: - from sigma.data import mitre_attack, mitre_d3fend - - datasets = [ - {'name': 'MITRE ATT&CK', 'module': mitre_attack}, - {'name': 'MITRE D3FEND', 'module': mitre_d3fend} - ] + datasets = get_cache_datasets() # Check what's cached cached_datasets = [] @@ -215,23 +240,22 @@ def clear_cache_command(yes): is_flag=True, help="Skip confirmation prompt.", ) -def update_cache_command(yes): +@click.option( + "--url", + "urls", + multiple=True, + help="Override a dataset source in the format dataset:url.", +) +def update_cache_command(yes, urls): """Update the cache by deleting it and re-caching data for all datasets.""" try: - from sigma.data import mitre_attack, mitre_d3fend - - datasets = [ - { - 'name': 'MITRE ATT&CK', - 'module': mitre_attack, - 'trigger_attr': 'mitre_attack_techniques_tactics_mapping' - }, - { - 'name': 'MITRE D3FEND', - 'module': mitre_d3fend, - 'trigger_attr': 'mitre_d3fend_techniques' - } - ] + datasets = get_cache_datasets() + url_overrides = parse_dataset_url_overrides(urls, datasets) + + for dataset in datasets: + dataset_url = url_overrides.get(dataset['key']) + if dataset_url is not None: + dataset['module'].set_url(dataset_url) # Get current cache info cached_datasets = [] @@ -298,5 +322,7 @@ def update_cache_command(yes): except ImportError: click.echo(click.style("Error: Unable to import pySigma data modules.", fg="red")) click.echo("Make sure pySigma is installed correctly.") + except click.ClickException: + raise except Exception as e: click.echo(click.style(f"Error updating cache: {str(e)}", fg="red")) \ No newline at end of file diff --git a/tests/test_pysigma.py b/tests/test_pysigma.py index eeac0ab..60688fd 100644 --- a/tests/test_pysigma.py +++ b/tests/test_pysigma.py @@ -1,5 +1,7 @@ import importlib import re +import sys +import types from sigma.cli.pysigma import pysigma_group, check_pysigma_version from click.testing import CliRunner import pytest @@ -68,7 +70,7 @@ def test_clear_cache_with_confirmation_cancel(): cli = CliRunner() result = cli.invoke(pysigma_group, ["clear-cache"], input="n\n") assert result.exit_code == 0 - assert "cancelled" in result.output.lower() or "empty" in result.output.lower() or "No cache directory found" in result.output + assert "cancelled" in result.output.lower() or "empty" in result.output.lower() or "nothing to clear" in result.output.lower() or "No cache directory found" in result.output def test_clear_cache_with_yes_flag(): @@ -76,7 +78,7 @@ def test_clear_cache_with_yes_flag(): cli = CliRunner() result = cli.invoke(pysigma_group, ["clear-cache", "-y"]) assert result.exit_code == 0 - assert "cleared" in result.output.lower() or "empty" in result.output.lower() or "No cache directory found" in result.output + assert "cleared" in result.output.lower() or "empty" in result.output.lower() or "nothing to clear" in result.output.lower() or "No cache directory found" in result.output def test_update_cache_help(): @@ -86,6 +88,7 @@ def test_update_cache_help(): assert result.exit_code == 0 assert "Update cache" in result.output assert "--yes" in result.output or "-y" in result.output + assert "--url" in result.output def test_update_cache_with_confirmation_cancel(): @@ -93,4 +96,107 @@ def test_update_cache_with_confirmation_cancel(): cli = CliRunner() result = cli.invoke(pysigma_group, ["update-cache"], input="n\n") assert result.exit_code == 0 - assert "cancelled" in result.output.lower() \ No newline at end of file + assert "cancelled" in result.output.lower() + + +class FakeCache: + def __init__(self, directory): + self.directory = str(directory) + self._keys = [] + self._size = 0 + + def iterkeys(self): + return iter(self._keys) + + def volume(self): + return self._size + + def seed(self, keys, size): + self._keys = list(keys) + self._size = size + + +class FakeDataset: + def __init__(self, directory, trigger_attr): + self.cache = FakeCache(directory) + self.trigger_attr = trigger_attr + self.urls = [] + self.clear_count = 0 + + def _get_cache(self): + return self.cache + + def clear_cache(self): + self.clear_count += 1 + self.cache.seed([], 0) + + def set_url(self, url): + self.urls.append(url) + + def __getattr__(self, name): + if name == self.trigger_attr: + self.cache.seed(["index"], 42) + return {"loaded": True} + raise AttributeError(name) + + +def install_fake_sigma_data(monkeypatch, tmp_path): + attack_dataset = FakeDataset(tmp_path / "attack-cache", "mitre_attack_techniques_tactics_mapping") + d3fend_dataset = FakeDataset(tmp_path / "d3fend-cache", "mitre_d3fend_techniques") + fake_sigma_data = types.ModuleType("sigma.data") + fake_sigma_data.mitre_attack = attack_dataset + fake_sigma_data.mitre_d3fend = d3fend_dataset + monkeypatch.setitem(sys.modules, "sigma.data", fake_sigma_data) + return attack_dataset, d3fend_dataset + + +def test_update_cache_with_url_overrides(monkeypatch, tmp_path): + cli = CliRunner() + attack_dataset, d3fend_dataset = install_fake_sigma_data(monkeypatch, tmp_path) + attack_path = str(tmp_path / "attack.json") + d3fend_path = str(tmp_path / "d3fend.json") + + result = cli.invoke( + pysigma_group, + [ + "update-cache", + "-y", + "--url", + f"mitre_attack:{attack_path}", + "--url", + f"mitre_d3fend:{d3fend_path}", + ], + ) + + assert result.exit_code == 0 + assert attack_dataset.urls == [attack_path] + assert d3fend_dataset.urls == [d3fend_path] + assert attack_dataset.clear_count == 1 + assert d3fend_dataset.clear_count == 1 + assert "Cache updated successfully" in result.output + + +def test_update_cache_rejects_invalid_url_format(monkeypatch, tmp_path): + cli = CliRunner() + install_fake_sigma_data(monkeypatch, tmp_path) + + result = cli.invoke( + pysigma_group, + ["update-cache", "-y", "--url", "mitre_attack"], + ) + + assert result.exit_code != 0 + assert "dataset:url" in result.output + + +def test_update_cache_rejects_unknown_dataset(monkeypatch, tmp_path): + cli = CliRunner() + install_fake_sigma_data(monkeypatch, tmp_path) + + result = cli.invoke( + pysigma_group, + ["update-cache", "-y", "--url", "unknown:/tmp/dataset.json"], + ) + + assert result.exit_code != 0 + assert "unknown dataset 'unknown'" in result.output \ No newline at end of file