From 704cb2db8f937b57b962cad945ebf67d554427e9 Mon Sep 17 00:00:00 2001 From: Noctua Date: Tue, 28 Apr 2026 20:36:03 +0000 Subject: [PATCH] chore: update charm libraries --- .../lib/charms/loki_k8s/v1/loki_push_api.py | 241 +----------------- 1 file changed, 6 insertions(+), 235 deletions(-) diff --git a/worker/lib/charms/loki_k8s/v1/loki_push_api.py b/worker/lib/charms/loki_k8s/v1/loki_push_api.py index 3e2e9bd2..fd3ee2ad 100644 --- a/worker/lib/charms/loki_k8s/v1/loki_push_api.py +++ b/worker/lib/charms/loki_k8s/v1/loki_push_api.py @@ -406,7 +406,7 @@ def __init__(self, *args): This directory must reside at the top level in the `src` folder of the consumer charm. Each file in this directory is assumed to be a single alert rule -in YAML format. The file name must have the `.rule` extension. +in YAML format. The file name must have one of the following extensions: `.yaml`, `.yml`, `.rule`, or `.rules`. The format of this alert rule conforms to the [Loki docs](https://grafana.com/docs/loki/latest/rules/#alerting-rules). @@ -507,7 +507,6 @@ def __init__(self, ...): import socket import subprocess import tempfile -import typing import warnings from copy import deepcopy from gzip import GzipFile @@ -520,6 +519,7 @@ def __init__(self, ...): import yaml from cosl import JujuTopology +from cosl.rules import AlertRules from ops.charm import ( CharmBase, HookEvent, @@ -545,7 +545,7 @@ def __init__(self, ...): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 22 +LIBPATCH = 23 PYDEPS = ["cosl"] @@ -755,237 +755,6 @@ def _is_single_alert_rule_format(rules_dict: dict) -> bool: # one alert rule per file return set(rules_dict) >= {"alert", "expr"} - -class AlertRules: - """Utility class for amalgamating Loki alert rule files and injecting juju topology. - - An `AlertRules` object supports aggregating alert rules from files and directories in both - official and single rule file formats using the `add_path()` method. All the alert rules - read are annotated with Juju topology labels and amalgamated into a single data structure - in the form of a Python dictionary using the `as_dict()` method. Such a dictionary can be - easily dumped into JSON format and exchanged over relation data. The dictionary can also - be dumped into YAML format and written directly into an alert rules file that is read by - Loki. Note that multiple `AlertRules` objects must not be written into the same file, - since Loki allows only a single list of alert rule groups per alert rules file. - - The official Loki format is a YAML file conforming to the Loki documentation - (https://grafana.com/docs/loki/latest/api/#list-rule-groups). - The custom single rule format is a subsection of the official YAML, having a single alert - rule, effectively "one alert per file". - """ - - # This class uses the following terminology for the various parts of a rule file: - # - alert rules file: the entire groups[] yaml, including the "groups:" key. - # - alert groups (plural): the list of groups[] (a list, i.e. no "groups:" key) - it is a list - # of dictionaries that have the "name" and "rules" keys. - # - alert group (singular): a single dictionary that has the "name" and "rules" keys. - # - alert rules (plural): all the alerts in a given alert group - a list of dictionaries with - # the "alert" and "expr" keys. - # - alert rule (singular): a single dictionary that has the "alert" and "expr" keys. - - def __init__(self, topology: Optional[JujuTopology] = None): - """Build and alert rule object. - - Args: - topology: a `JujuTopology` instance that is used to annotate all alert rules. - """ - self.topology = topology - self.tool = CosTool(None) - self.alert_groups = [] # type: List[dict] - - def _from_file(self, root_path: Path, file_path: Path) -> List[dict]: - """Read a rules file from path, injecting juju topology. - - Args: - root_path: full path to the root rules folder (used only for generating group name) - file_path: full path to a *.rule file. - - Returns: - A list of dictionaries representing the rules file, if file is valid (the structure is - formed by `yaml.safe_load` of the file); an empty list otherwise. - """ - with file_path.open() as rf: - # Load a list of rules from file then add labels and filters - try: - rule_file = yaml.safe_load(rf) or {} - - except Exception as e: - logger.error("Failed to read alert rules from %s: %s", file_path.name, e) - return [] - - if _is_official_alert_rule_format(rule_file): - alert_groups = rule_file["groups"] - elif _is_single_alert_rule_format(rule_file): - # convert to list of alert groups - # group name is made up from the file name - alert_groups = [{"name": file_path.stem, "rules": [rule_file]}] - else: - # invalid/unsupported - reason = "file is empty" if not rule_file else "unexpected file structure" - logger.error("Invalid rules file (%s): %s", reason, file_path.name) - return [] - - # update rules with additional metadata - for alert_group in alert_groups: - # update group name with topology and sub-path - alert_group["name"] = self._group_name( - str(root_path), - str(file_path), - alert_group["name"], - ) - - # add "juju_" topology labels - for alert_rule in alert_group["rules"]: - if "labels" not in alert_rule: - alert_rule["labels"] = {} - - if self.topology: - # only insert labels that do not already exist - for label, val in self.topology.label_matcher_dict.items(): - if label not in alert_rule["labels"]: - alert_rule["labels"][label] = val - - # insert juju topology filters into a prometheus alert rule - # logql doesn't like empty matchers, so add a job matcher which hits - # any string as a "wildcard" which the topology labels will - # filter down - alert_rule["expr"] = self.tool.inject_label_matchers( - re.sub(r"%%juju_topology%%", r'job=~".+"', alert_rule["expr"]), - self.topology.label_matcher_dict, - ) - - return alert_groups - - def _group_name( - self, - root_path: typing.Union[Path, str], - file_path: typing.Union[Path, str], - group_name: str, - ) -> str: - """Generate group name from path and topology. - - The group name is made up of the relative path between the root dir_path, the file path, - and topology identifier. - - Args: - root_path: path to the root rules dir. - file_path: path to rule file. - group_name: original group name to keep as part of the new augmented group name - - Returns: - New group name, augmented by juju topology and relative path. - """ - file_path = Path(file_path) if not isinstance(file_path, Path) else file_path - root_path = Path(root_path) if not isinstance(root_path, Path) else root_path - rel_path = file_path.parent.relative_to(root_path.as_posix()) - - # We should account for both absolute paths and Windows paths. Convert it to a POSIX - # string, strip off any leading /, then join it - - path_str = "" - if not rel_path == Path("."): - # Get rid of leading / and optionally drive letters so they don't muck up - # the template later, since Path.parts returns them. The 'if relpath.is_absolute ...' - # isn't even needed since re.sub doesn't throw exceptions if it doesn't match, so it's - # optional, but it makes it clear what we're doing. - - # Note that Path doesn't actually care whether the path is valid just to instantiate - # the object, so we can happily strip that stuff out to make templating nicer - rel_path = Path( - re.sub(r"^([A-Za-z]+:)?/", "", rel_path.as_posix()) - if rel_path.is_absolute() - else str(rel_path) - ) - - # Get rid of relative path characters in the middle which both os.path and pathlib - # leave hanging around. We could use path.resolve(), but that would lead to very - # long template strings when rules come from pods and/or other deeply nested charm - # paths - path_str = "_".join(filter(lambda x: x not in ["..", "/"], rel_path.parts)) - - # Generate group name: - # - name, from juju topology - # - suffix, from the relative path of the rule file; - group_name_parts = [self.topology.identifier] if self.topology else [] - group_name_parts.extend([path_str, group_name, "alerts"]) - # filter to remove empty strings - return "_".join(filter(lambda x: x, group_name_parts)) - - @classmethod - def _multi_suffix_glob( - cls, dir_path: Path, suffixes: List[str], recursive: bool = True - ) -> list: - """Helper function for getting all files in a directory that have a matching suffix. - - Args: - dir_path: path to the directory to glob from. - suffixes: list of suffixes to include in the glob (items should begin with a period). - recursive: a flag indicating whether a glob is recursive (nested) or not. - - Returns: - List of files in `dir_path` that have one of the suffixes specified in `suffixes`. - """ - all_files_in_dir = dir_path.glob("**/*" if recursive else "*") - return list(filter(lambda f: f.is_file() and f.suffix in suffixes, all_files_in_dir)) - - def _from_dir(self, dir_path: Path, recursive: bool) -> List[dict]: - """Read all rule files in a directory. - - All rules from files for the same directory are loaded into a single - group. The generated name of this group includes juju topology. - By default, only the top directory is scanned; for nested scanning, pass `recursive=True`. - - Args: - dir_path: directory containing *.rule files (alert rules without groups). - recursive: flag indicating whether to scan for rule files recursively. - - Returns: - a list of dictionaries representing prometheus alert rule groups, each dictionary - representing an alert group (structure determined by `yaml.safe_load`). - """ - alert_groups = [] # type: List[dict] - - # Gather all alerts into a list of groups - for file_path in self._multi_suffix_glob(dir_path, [".rule", ".rules"], recursive): - alert_groups_from_file = self._from_file(dir_path, file_path) - if alert_groups_from_file: - logger.debug("Reading alert rule from %s", file_path) - alert_groups.extend(alert_groups_from_file) - - return alert_groups - - def add_path(self, path_str: str, *, recursive: bool = False): - """Add rules from a dir path. - - All rules from files are aggregated into a data structure representing a single rule file. - All group names are augmented with juju topology. - - Args: - path_str: either a rules file or a dir of rules files. - recursive: whether to read files recursively or not (no impact if `path` is a file). - - Raises: - InvalidAlertRulePathError: if the provided path is invalid. - """ - path = Path(path_str) # type: Path - if path.is_dir(): - self.alert_groups.extend(self._from_dir(path, recursive)) - elif path.is_file(): - self.alert_groups.extend(self._from_file(path.parent, path)) - else: - logger.debug("The alerts file does not exist: %s", path) - - def as_dict(self) -> dict: - """Return standard alert rules file in dict representation. - - Returns: - a dictionary containing a single list of alert rule groups. - The list of alert rule groups is provided as value of the - "groups" dictionary key. - """ - return {"groups": self.alert_groups} if self.alert_groups else {} - - def _resolve_dir_against_charm_path(charm: CharmBase, *path_elements: str) -> str: """Resolve the provided path items against the directory of the main file. @@ -1599,7 +1368,9 @@ def _handle_alert_rules(self, relation): return alert_rules = ( - AlertRules(None) if self._skip_alert_topology_labeling else AlertRules(self.topology) + AlertRules(query_type="logql") + if self._skip_alert_topology_labeling + else AlertRules(query_type="logql", topology=self.topology) ) if self._forward_alert_rules: alert_rules.add_path(self._alert_rules_path, recursive=self._recursive)