From bd4d9a588c8776a2b4b05e50a68587f65047f660 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 2 Apr 2026 23:42:44 +0000 Subject: [PATCH 1/3] Initial plan From 0ccb39a6fdd478d4242f23afeb8e8a9bd4be18a7 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 3 Apr 2026 00:31:07 +0000 Subject: [PATCH 2/3] Add Plugin column to sigma list targets and sigma list pipelines Agent-Logs-Url: https://github.com/SigmaHQ/sigma-cli/sessions/ba0da45e-4bc2-48de-88f1-4cb5e11a1846 Co-authored-by: thomaspatzke <1845601+thomaspatzke@users.noreply.github.com> --- sigma/cli/list.py | 92 ++++++++++++++++++++++++++++++++++++++++++++- tests/test_lists.py | 25 +++++++++--- 2 files changed, 110 insertions(+), 7 deletions(-) diff --git a/sigma/cli/list.py b/sigma/cli/list.py index 6c9cc98..8974457 100644 --- a/sigma/cli/list.py +++ b/sigma/cli/list.py @@ -1,3 +1,10 @@ +import importlib +import importlib.metadata +import os +import site +import sys +from functools import lru_cache + import click from sigma.plugins import InstalledSigmaPlugins from sigma.modifiers import modifier_mapping @@ -13,6 +20,80 @@ plugins = InstalledSigmaPlugins.autodiscover() +def _site_dirs() -> frozenset: + """Return all known site-packages directories.""" + dirs = set() + try: + dirs.update(site.getsitepackages()) + except AttributeError: + pass + try: + dirs.add(site.getusersitepackages()) + except AttributeError: + pass + # Fall back to sys.path entries that contain "site-packages" + if not dirs: + dirs.update(p for p in sys.path if "site-packages" in p) + return frozenset(dirs) + + +@lru_cache(maxsize=None) +def _file_to_dist_mapping() -> dict: + """Build and cache a mapping from relative file paths to distribution names.""" + mapping: dict = {} + for dist in importlib.metadata.distributions(): + dist_name = dist.metadata.get("Name", "") + for f in dist.files or []: + mapping[str(f).replace(os.sep, "/")] = dist_name + return mapping + + +def _get_module_package(module_name: str) -> str: + """Return the distribution package name that provides the given module.""" + try: + mod = importlib.import_module(module_name) + mod_file = getattr(mod, "__file__", None) + if mod_file is None: + return "n/a" + mapping = _file_to_dist_mapping() + for path in _site_dirs(): + if mod_file.startswith(path): + rel = os.path.relpath(mod_file, path).replace(os.sep, "/") + return mapping.get(rel, "n/a") + except (ImportError, AttributeError, ValueError): + pass + return "n/a" + + +def _get_backend_package(backend_class) -> str: + """Return the distribution package name for a backend class.""" + return _get_module_package(backend_class.__module__) + + +def _get_pipeline_package(pipeline_obj) -> str: + """Return the distribution package name for a pipeline object or function.""" + # Plain function: use its own module + module_name = getattr(pipeline_obj, "__module__", None) + if module_name and module_name != "sigma.pipelines.base": + return _get_module_package(module_name) + # Pipeline-decorator instance: retrieve the wrapped function's module + func = getattr(pipeline_obj, "func", None) + if func is not None: + func_module = getattr(func, "__module__", None) + if func_module: + return _get_module_package(func_module) + return "n/a" + + +# Pre-compute package names for all discovered pipelines (keyed by identifier). +# This must be done before the resolver resolves pipeline callables into plain +# ProcessingPipeline objects, which no longer carry module information. +_pipeline_packages: dict = { + name: _get_pipeline_package(pipeline) + for name, pipeline in plugins.pipelines.items() +} + + @click.group(name="list", help="List available targets or processing pipelines.") def list_group(): pass @@ -32,6 +113,7 @@ def list_targets(): "Identifier", "Target Query Language", "Processing Pipeline Required", + "Plugin", ) table.add_rows( [ @@ -39,6 +121,7 @@ def list_targets(): name, fill(backend.name, width=60), "Yes" if backend.requires_pipeline else "No", + _get_backend_package(backend), ) for name, backend in plugins.backends.items() ] @@ -108,6 +191,7 @@ def list_pipelines(backend): "Priority", "Processing Pipeline", "Backends", + "Plugin", ) for name, pipeline in pipelines: if ( @@ -120,7 +204,13 @@ def list_pipelines(backend): else: backends = "all" table.add_row( - (name, pipeline.priority, fill(pipeline.name, width=60), backends) + ( + name, + pipeline.priority, + fill(pipeline.name, width=60), + backends, + _pipeline_packages.get(name, "n/a"), + ) ) table.align = "l" click.echo(table.get_string()) diff --git a/tests/test_lists.py b/tests/test_lists.py index e0e4783..4254833 100644 --- a/tests/test_lists.py +++ b/tests/test_lists.py @@ -125,10 +125,23 @@ def test_transformation_list(): assert all((name in result.stdout for name in transformations.keys())) -def test_condition_list(): +def test_targets_has_plugin_column(): cli = CliRunner() - result = cli.invoke(list_conditions) - conditions = list(rule_conditions.keys()) - conditions.extend(detection_item_conditions.keys()) - conditions.extend(field_name_conditions.keys()) - assert all((name in result.stdout for name in conditions)) + result = cli.invoke(list_targets) + assert result.exit_code == 0 + assert "Plugin" in result.stdout + # Each row should show a non-empty package name (not "n/a") for installed backends. + plugins = InstalledSigmaPlugins.autodiscover() + if plugins.backends: + assert "n/a" not in result.stdout + + +def test_pipelines_has_plugin_column(): + cli = CliRunner() + result = cli.invoke(list_pipelines) + assert result.exit_code == 0 + assert "Plugin" in result.stdout + # Each row should show a non-empty package name (not "n/a") for installed pipelines. + plugins = InstalledSigmaPlugins.autodiscover() + if plugins.pipelines: + assert "n/a" not in result.stdout From be13a2717313c013e6e66c57347d3fee95114aa5 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 3 Apr 2026 11:36:02 +0000 Subject: [PATCH 3/3] Show Sigma plugin identifier in Plugin column instead of Python package name Agent-Logs-Url: https://github.com/SigmaHQ/sigma-cli/sessions/5f95be44-fd6c-48ca-97c8-91041626a62c Co-authored-by: thomaspatzke <1845601+thomaspatzke@users.noreply.github.com> --- sigma/cli/list.py | 86 ++++++++++++++--------------------------------- 1 file changed, 26 insertions(+), 60 deletions(-) diff --git a/sigma/cli/list.py b/sigma/cli/list.py index 8974457..a3a3fe7 100644 --- a/sigma/cli/list.py +++ b/sigma/cli/list.py @@ -1,10 +1,3 @@ -import importlib -import importlib.metadata -import os -import site -import sys -from functools import lru_cache - import click from sigma.plugins import InstalledSigmaPlugins from sigma.modifiers import modifier_mapping @@ -20,76 +13,49 @@ plugins = InstalledSigmaPlugins.autodiscover() -def _site_dirs() -> frozenset: - """Return all known site-packages directories.""" - dirs = set() - try: - dirs.update(site.getsitepackages()) - except AttributeError: - pass - try: - dirs.add(site.getusersitepackages()) - except AttributeError: - pass - # Fall back to sys.path entries that contain "site-packages" - if not dirs: - dirs.update(p for p in sys.path if "site-packages" in p) - return frozenset(dirs) - +def _plugin_id_from_module(module_name: str, namespace: str) -> str: + """Extract the Sigma plugin identifier from a module name. -@lru_cache(maxsize=None) -def _file_to_dist_mapping() -> dict: - """Build and cache a mapping from relative file paths to distribution names.""" - mapping: dict = {} - for dist in importlib.metadata.distributions(): - dist_name = dist.metadata.get("Name", "") - for f in dist.files or []: - mapping[str(f).replace(os.sep, "/")] = dist_name - return mapping - - -def _get_module_package(module_name: str) -> str: - """Return the distribution package name that provides the given module.""" + Sigma backends live in ``sigma.backends..*`` and pipelines in + ``sigma.pipelines..*``. Splitting on '.' and picking the + component after the namespace prefix returns the plugin identifier directly + (e.g. ``sigma.backends.splunk.backend`` → ``splunk``). + """ + parts = module_name.split(".") + # Expected layout: sigma . . [. submodule ...] try: - mod = importlib.import_module(module_name) - mod_file = getattr(mod, "__file__", None) - if mod_file is None: - return "n/a" - mapping = _file_to_dist_mapping() - for path in _site_dirs(): - if mod_file.startswith(path): - rel = os.path.relpath(mod_file, path).replace(os.sep, "/") - return mapping.get(rel, "n/a") - except (ImportError, AttributeError, ValueError): - pass - return "n/a" + idx = parts.index(namespace) + plugin_id = parts[idx + 1] + return plugin_id if plugin_id else "n/a" + except (ValueError, IndexError): + return "n/a" -def _get_backend_package(backend_class) -> str: - """Return the distribution package name for a backend class.""" - return _get_module_package(backend_class.__module__) +def _get_backend_plugin_id(backend_class) -> str: + """Return the Sigma plugin identifier for a backend class.""" + return _plugin_id_from_module(backend_class.__module__, "backends") -def _get_pipeline_package(pipeline_obj) -> str: - """Return the distribution package name for a pipeline object or function.""" +def _get_pipeline_plugin_id(pipeline_obj) -> str: + """Return the Sigma plugin identifier for a pipeline object or function.""" # Plain function: use its own module module_name = getattr(pipeline_obj, "__module__", None) if module_name and module_name != "sigma.pipelines.base": - return _get_module_package(module_name) + return _plugin_id_from_module(module_name, "pipelines") # Pipeline-decorator instance: retrieve the wrapped function's module func = getattr(pipeline_obj, "func", None) if func is not None: func_module = getattr(func, "__module__", None) if func_module: - return _get_module_package(func_module) + return _plugin_id_from_module(func_module, "pipelines") return "n/a" -# Pre-compute package names for all discovered pipelines (keyed by identifier). +# Pre-compute plugin IDs for all discovered pipelines (keyed by identifier). # This must be done before the resolver resolves pipeline callables into plain # ProcessingPipeline objects, which no longer carry module information. -_pipeline_packages: dict = { - name: _get_pipeline_package(pipeline) +_pipeline_plugin_ids: dict = { + name: _get_pipeline_plugin_id(pipeline) for name, pipeline in plugins.pipelines.items() } @@ -121,7 +87,7 @@ def list_targets(): name, fill(backend.name, width=60), "Yes" if backend.requires_pipeline else "No", - _get_backend_package(backend), + _get_backend_plugin_id(backend), ) for name, backend in plugins.backends.items() ] @@ -209,7 +175,7 @@ def list_pipelines(backend): pipeline.priority, fill(pipeline.name, width=60), backends, - _pipeline_packages.get(name, "n/a"), + _pipeline_plugin_ids.get(name, "n/a"), ) ) table.align = "l"