Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 64 additions & 38 deletions sigma/cli/pysigma.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,52 @@
from packaging.specifiers import SpecifierSet
from prettytable import PrettyTable


def get_cache_datasets():
from sigma.data import mitre_attack, mitre_d3fend

return [
{
'key': 'mitre_attack',
'name': 'MITRE ATT&CK',
'module': mitre_attack,
'cache_key': 'mitre_attack_data_default',
'version_key': 'mitre_attack_version',
'trigger_attr': 'mitre_attack_techniques_tactics_mapping'
},
{
'key': 'mitre_d3fend',
'name': 'MITRE D3FEND',
'module': mitre_d3fend,
'cache_key': 'mitre_d3fend_data_default',
'version_key': 'mitre_d3fend_version',
'trigger_attr': 'mitre_d3fend_techniques'
}
]


def parse_dataset_url_overrides(urls, datasets):
dataset_names = {dataset['key'] for dataset in datasets}
overrides = {}

for url_override in urls:
dataset_name, separator, dataset_url = url_override.partition(":")
if not separator or not dataset_name or not dataset_url:
raise click.BadParameter(
"must use the format dataset:url",
param_hint="--url",
)

if dataset_name not in dataset_names:
raise click.BadParameter(
f"unknown dataset '{dataset_name}'. Available datasets: {', '.join(sorted(dataset_names))}",
param_hint="--url",
)

overrides[dataset_name] = dataset_url

return overrides

def get_pysigma_requirement():
requires = importlib.metadata.requires("sigma-cli")
return [
Expand Down Expand Up @@ -81,23 +127,7 @@ def check_pysigma(quiet=False):
def list_cache_command():
"""List the cached versions of pySigma data and their timestamps."""
try:
from sigma.data import mitre_attack, mitre_d3fend

# Configuration for datasets to check
datasets = [
{
'name': 'MITRE ATT&CK',
'module': mitre_attack,
'cache_key': 'mitre_attack_data_default',
'version_key': 'mitre_attack_version'
},
{
'name': 'MITRE D3FEND',
'module': mitre_d3fend,
'cache_key': 'mitre_d3fend_data_default',
'version_key': 'mitre_d3fend_version'
}
]
datasets = get_cache_datasets()

table = PrettyTable()
table.field_names = ["Dataset", "Version", "Cached Date"]
Expand Down Expand Up @@ -146,12 +176,7 @@ def list_cache_command():
def clear_cache_command(yes):
"""Delete the cached data for all datasets."""
try:
from sigma.data import mitre_attack, mitre_d3fend

datasets = [
{'name': 'MITRE ATT&CK', 'module': mitre_attack},
{'name': 'MITRE D3FEND', 'module': mitre_d3fend}
]
datasets = get_cache_datasets()

# Check what's cached
cached_datasets = []
Expand Down Expand Up @@ -215,23 +240,22 @@ def clear_cache_command(yes):
is_flag=True,
help="Skip confirmation prompt.",
)
def update_cache_command(yes):
@click.option(
"--url",
"urls",
multiple=True,
help="Override a dataset source in the format dataset:url.",
)
def update_cache_command(yes, urls):
"""Update the cache by deleting it and re-caching data for all datasets."""
try:
from sigma.data import mitre_attack, mitre_d3fend

datasets = [
{
'name': 'MITRE ATT&CK',
'module': mitre_attack,
'trigger_attr': 'mitre_attack_techniques_tactics_mapping'
},
{
'name': 'MITRE D3FEND',
'module': mitre_d3fend,
'trigger_attr': 'mitre_d3fend_techniques'
}
]
datasets = get_cache_datasets()
url_overrides = parse_dataset_url_overrides(urls, datasets)

for dataset in datasets:
dataset_url = url_overrides.get(dataset['key'])
if dataset_url is not None:
dataset['module'].set_url(dataset_url)

# Get current cache info
cached_datasets = []
Expand Down Expand Up @@ -298,5 +322,7 @@ def update_cache_command(yes):
except ImportError:
click.echo(click.style("Error: Unable to import pySigma data modules.", fg="red"))
click.echo("Make sure pySigma is installed correctly.")
except click.ClickException:
raise
except Exception as e:
click.echo(click.style(f"Error updating cache: {str(e)}", fg="red"))
112 changes: 109 additions & 3 deletions tests/test_pysigma.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import importlib
import re
import sys
import types
from sigma.cli.pysigma import pysigma_group, check_pysigma_version
from click.testing import CliRunner
import pytest
Expand Down Expand Up @@ -68,15 +70,15 @@ def test_clear_cache_with_confirmation_cancel():
cli = CliRunner()
result = cli.invoke(pysigma_group, ["clear-cache"], input="n\n")
assert result.exit_code == 0
assert "cancelled" in result.output.lower() or "empty" in result.output.lower() or "No cache directory found" in result.output
assert "cancelled" in result.output.lower() or "empty" in result.output.lower() or "nothing to clear" in result.output.lower() or "No cache directory found" in result.output


def test_clear_cache_with_yes_flag():
"""Test clear-cache command with -y flag."""
cli = CliRunner()
result = cli.invoke(pysigma_group, ["clear-cache", "-y"])
assert result.exit_code == 0
assert "cleared" in result.output.lower() or "empty" in result.output.lower() or "No cache directory found" in result.output
assert "cleared" in result.output.lower() or "empty" in result.output.lower() or "nothing to clear" in result.output.lower() or "No cache directory found" in result.output


def test_update_cache_help():
Expand All @@ -86,11 +88,115 @@ def test_update_cache_help():
assert result.exit_code == 0
assert "Update cache" in result.output
assert "--yes" in result.output or "-y" in result.output
assert "--url" in result.output


def test_update_cache_with_confirmation_cancel():
"""Test update-cache command cancellation."""
cli = CliRunner()
result = cli.invoke(pysigma_group, ["update-cache"], input="n\n")
assert result.exit_code == 0
assert "cancelled" in result.output.lower()
assert "cancelled" in result.output.lower()


class FakeCache:
def __init__(self, directory):
self.directory = str(directory)
self._keys = []
self._size = 0

def iterkeys(self):
return iter(self._keys)

def volume(self):
return self._size

def seed(self, keys, size):
self._keys = list(keys)
self._size = size


class FakeDataset:
def __init__(self, directory, trigger_attr):
self.cache = FakeCache(directory)
self.trigger_attr = trigger_attr
self.urls = []
self.clear_count = 0

def _get_cache(self):
return self.cache

def clear_cache(self):
self.clear_count += 1
self.cache.seed([], 0)

def set_url(self, url):
self.urls.append(url)

def __getattr__(self, name):
if name == self.trigger_attr:
self.cache.seed(["index"], 42)
return {"loaded": True}
raise AttributeError(name)


def install_fake_sigma_data(monkeypatch, tmp_path):
attack_dataset = FakeDataset(tmp_path / "attack-cache", "mitre_attack_techniques_tactics_mapping")
d3fend_dataset = FakeDataset(tmp_path / "d3fend-cache", "mitre_d3fend_techniques")
fake_sigma_data = types.ModuleType("sigma.data")
fake_sigma_data.mitre_attack = attack_dataset
fake_sigma_data.mitre_d3fend = d3fend_dataset
monkeypatch.setitem(sys.modules, "sigma.data", fake_sigma_data)
return attack_dataset, d3fend_dataset


def test_update_cache_with_url_overrides(monkeypatch, tmp_path):
cli = CliRunner()
attack_dataset, d3fend_dataset = install_fake_sigma_data(monkeypatch, tmp_path)
attack_path = str(tmp_path / "attack.json")
d3fend_path = str(tmp_path / "d3fend.json")

result = cli.invoke(
pysigma_group,
[
"update-cache",
"-y",
"--url",
f"mitre_attack:{attack_path}",
"--url",
f"mitre_d3fend:{d3fend_path}",
],
)

assert result.exit_code == 0
assert attack_dataset.urls == [attack_path]
assert d3fend_dataset.urls == [d3fend_path]
assert attack_dataset.clear_count == 1
assert d3fend_dataset.clear_count == 1
assert "Cache updated successfully" in result.output


def test_update_cache_rejects_invalid_url_format(monkeypatch, tmp_path):
cli = CliRunner()
install_fake_sigma_data(monkeypatch, tmp_path)

result = cli.invoke(
pysigma_group,
["update-cache", "-y", "--url", "mitre_attack"],
)

assert result.exit_code != 0
assert "dataset:url" in result.output


def test_update_cache_rejects_unknown_dataset(monkeypatch, tmp_path):
cli = CliRunner()
install_fake_sigma_data(monkeypatch, tmp_path)

result = cli.invoke(
pysigma_group,
["update-cache", "-y", "--url", "unknown:/tmp/dataset.json"],
)

assert result.exit_code != 0
assert "unknown dataset 'unknown'" in result.output
Loading