diff --git a/alpacloud/eztag/BUILD b/alpacloud/eztag/BUILD new file mode 100644 index 0000000..2aabcbb --- /dev/null +++ b/alpacloud/eztag/BUILD @@ -0,0 +1,35 @@ +python_sources() + +python_tests( + name="tests", + # dependencies=["./test_resources:k8s_objs"], +) + +python_test_utils( + name="test_utils", +) + +python_distribution( + name="alpacloud.eztag", + repositories=["@alpacloud.eztag"], + dependencies=[":eztag"], + long_description_path="alpacloud/eztag/readme.md", + provides=python_artifact( + name="alpacloud_eztag", + version="0.1.0", + description="A library for filtering things based on tags", + author="Daniel Goldman", + classifiers=[ + "Development Status :: 3 - Alpha", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Programming Language :: Python :: 3.14", + "Topic :: Utilities", + "Topic :: System :: Systems Administration", + ], + license="Round Robin 2.0.0", + long_description_content_type="text/markdown", + ), +) diff --git a/alpacloud/eztag/__init__.py b/alpacloud/eztag/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/alpacloud/eztag/logic.py b/alpacloud/eztag/logic.py new file mode 100644 index 0000000..6df4df0 --- /dev/null +++ b/alpacloud/eztag/logic.py @@ -0,0 +1,98 @@ +"""Expressions for tag filtering""" + +from abc import ABC +from dataclasses import dataclass +from typing import Callable + +from alpacloud.eztag.tag import TagSet + + +class Expr(ABC): + """A predicate""" + + def check(self, tags: TagSet) -> bool: + """Check if the condition is satisfied""" + raise NotImplementedError + + +@dataclass(frozen=True, slots=True) +class Cond_(Expr): + """A condition that checks if a tag set satisfies a predicate""" + + f: Callable[[TagSet], bool] + + def check(self, tags: TagSet) -> bool: + return self.f(tags) + + +@dataclass(frozen=True, slots=True) +class And_(Expr): + """AND of multiple conditions""" + + conds: list[Cond_] + + def check(self, tags: TagSet) -> bool: + return all(cond.check(tags) for cond in self.conds) + + +@dataclass(frozen=True, slots=True) +class Or_(Expr): + """OR of multiple conditions""" + + conds: list[Cond_] + + def check(self, tags: TagSet) -> bool: + return any(cond.check(tags) for cond in self.conds) + + +@dataclass(frozen=True, slots=True) +class Not_(Expr): + """Negation of a condition""" + + cond: Cond_ + + def check(self, tags: TagSet) -> bool: + return not self.cond.check(tags) + + +@dataclass(frozen=True, slots=True) +class TagHas(Expr): + """Check if a tag set has a given tag, with any value""" + + k: str + + def check(self, tags: TagSet) -> bool: + return tags.has(self.k) + + +@dataclass(frozen=True, slots=True) +class TagMatch(Expr): + """Check if a tag set has a given tag with a specific value""" + + k: str + v: str | None + + def check(self, tags: TagSet) -> bool: + return tags.match(self.k, self.v) + + +@dataclass(frozen=True, slots=True) +class TagRematch(Expr): + """Check if a tag set has a given tag with value matching a regular expression""" + + k: str + v: str + + def check(self, tags: TagSet) -> bool: + return tags.rematch(self.k, self.v) + + +@dataclass(frozen=True, slots=True) +class TagContains(Expr): + """Check if a tag set has a given tag with a specific value""" + + k: str + v: str + + def check(self, tags: TagSet) -> bool: + return tags.contains(self.k, self.v) diff --git a/alpacloud/eztag/multidict.py b/alpacloud/eztag/multidict.py new file mode 100644 index 0000000..0344d0c --- /dev/null +++ b/alpacloud/eztag/multidict.py @@ -0,0 +1,57 @@ +"""Generic multidict implementation. A multidict allows multiple values for the same key.""" + +from __future__ import annotations + +from typing import Iterable, TypeAlias, TypeGuard + +K: TypeAlias = str +V: TypeAlias = str | None + + +def _is_collection(obj) -> TypeGuard[Iterable]: + """ + Checks if an object is an iterable collection, excluding strings and bytes. + """ + return isinstance(obj, Iterable) and not isinstance(obj, (str, bytes, bytearray)) + + +class MultiDict: + """ + A dictionary that allows multiple values for the same key. + This allows us to have a tag set like `env=prd, env=stg` + """ + + def __init__(self): + self.d: dict[K, set[V]] = {} + + @classmethod + def from_dict(cls, d: dict[K, V]) -> MultiDict: + """Create a multidict from a dict of key-value pairs""" + md = cls() + for k, v in d.items(): + md[k] = {v} + return md + + @classmethod + def create(cls, d: dict[K, Iterable[V] | V]) -> MultiDict: + """ + Create a multidict from a dict of key-value pairs or key-list of values pairs + """ + md = cls() + for k, vs in d.items(): + n: set[V] + if not _is_collection(vs): + n = {vs} # type: ignore # idk typeguard + else: + n = set(vs) + md.d[k] = n + return md + + def __getitem__(self, key) -> set[V]: + return self.d[key] + + def __setitem__(self, key, value): + self.d.setdefault(key, set()).add(value) + + def __contains__(self, key) -> bool: + return key in self.d diff --git a/alpacloud/eztag/parser.py b/alpacloud/eztag/parser.py new file mode 100644 index 0000000..78b8dae --- /dev/null +++ b/alpacloud/eztag/parser.py @@ -0,0 +1,227 @@ +""" +Parse filter expressions into predicates for filtering tags. + +The grammar is as follows: +regex_literal := "/" regex "/" +string_literal := any characters except "(),/" and spaces +expr := identifier(expr [, expr])* | regex_literal | string_literal +identifier := "and" | "or" | "not" | "has" | "match" | "re" | "contains" +""" + +from __future__ import annotations + +import abc +from dataclasses import dataclass +from typing import List, Literal, Optional + +from alpacloud.eztag import logic +from alpacloud.eztag.logic import Expr + + +@dataclass +class ParseState: + """State of the parser""" + + text: str + pos: int = 0 + + def peek(self) -> Optional[str]: + """Look at the next character in the input text without consuming it""" + return self.text[self.pos] if self.pos < len(self.text) else None + + def consume(self) -> Optional[str]: + """Consume the next character from the input text""" + if self.pos >= len(self.text): + return None + char = self.text[self.pos] + self.pos += 1 + return char + + def consume_while(self, predicate) -> str: + """ + Consume characters from the input text while the predicate is True + """ + result = [] + while self.peek() and predicate(self.peek()): + next_result = self.consume() + if next_result is not None: + result.append(next_result) + return "".join(result) + + +class ASTNode(abc.ABC): + """Abstract base class for AST nodes""" + + +@dataclass +class StringLiteral(ASTNode): + """AST node representing a string literal""" + + value: str + + +@dataclass +class FunctionCall(ASTNode): + """AST node representing a function call. Everything that isn't a literal is a function call.""" + + name: str + args: list[ASTNode] + + +@dataclass +class RegexLiteral(ASTNode): + """AST node representing a regex literal""" + + value: str + + +class Parser: + """Parses a filter expression into an AST""" + + reserved_chars = set("(),/") + + def __init__(self, text: str): + self.state = ParseState(text.strip()) + + def parse(self) -> FunctionCall: + """ + Parse a filter expression into an AST + """ + return self._parse_function_call() + + def _parse_function_call(self) -> FunctionCall: + name = self._parse_identifier() + if not name: + raise ValueError("Expected function name") + + if self.state.peek() != "(": + raise ValueError("Expected opening parenthesis") + self.state.consume() # consume '(' + + args = self._parse_arguments() + + if self.state.peek() != ")": + raise ValueError("Expected closing parenthesis") + self.state.consume() # consume ')' + + return FunctionCall(name, args) + + def _parse_identifier(self) -> str: + return self.state.consume_while(lambda c: c not in self.reserved_chars and not c.isspace()) + + def _parse_arguments(self) -> List[ASTNode]: + args: List[ASTNode] = [] + while True: + self.state.consume_while(str.isspace) + + if self.state.peek() == ")": + break + + if self.state.peek() == "/": + args.append(self._parse_regex()) + else: + args.append(self._parse_argument()) + + self.state.consume_while(str.isspace) + if self.state.peek() == ",": + self.state.consume() + elif self.state.peek() != ")": + raise ValueError("Expected comma or closing parenthesis") + + return args + + def _parse_regex(self) -> RegexLiteral: + self.state.consume() + v = RegexLiteral(self.state.consume_while(lambda c: c != "/")) + self.state.consume() + return v + + def _parse_argument(self) -> ASTNode: + self.state.consume_while(str.isspace) + + # Check if this argument is a function call or string literal + start_pos = self.state.pos + identifier = self._parse_identifier() + + if identifier and self.state.peek() == "(": + # It's a nested function call - parse it recursively + self.state.pos = start_pos # Reset position + return self._parse_function_call() + elif identifier: + # It's a string literal (identifier not followed by '(') + # Check that we're at a valid stopping point + self.state.consume_while(str.isspace) + next_char = self.state.peek() + if next_char not in (",", ")", None): + raise ValueError("Expected comma or closing parenthesis") + return StringLiteral(identifier) + else: + raise ValueError("Expected identifier") + + +@dataclass +class TokenTransformation: + """ + Associates the name of a function with the Expr implementing functionality. + + The `args` field associates positional arguments in the raw filter expression with the function's keyword arguments. + For example, MATCH takes a key (k) and a value (v), so `args = ["k", "v"]`. + For functions that take a variable number of arguments, such as AND and OR, `args = "variadic"`. + """ + + name: str + function: type[Expr] + args: list[str] | Literal["variadic"] + + +class TokenTransformer: + """Transforms AST nodes into Exprs""" + + def __init__(self, transformations: dict[str, TokenTransformation], case_sensitive_tokens: bool = False): + self.case_sensitive_tokens = case_sensitive_tokens + if case_sensitive_tokens: + self.transformations = transformations + else: + self.transformations = {k.lower(): v for k, v in transformations.items()} + + def transform(self, token: ASTNode) -> Expr | str: + """Transform an AST node into an Expr""" + match token: + case StringLiteral(): + return token.value + case RegexLiteral(): + return token.value + case FunctionCall(): + if self.case_sensitive_tokens: + transformation = self.transformations[token.name] + else: + transformation = self.transformations[token.name.lower()] + + if transformation.args == "variadic": + return transformation.function([self.transform(e) for e in token.args]) # type: ignore # the typesafety is done by the TokenTransformation + else: + raw_kwargs = dict(zip(transformation.args, token.args)) + kwargs = {k: self.transform(v) for k, v in raw_kwargs.items()} + return transformation.function(**kwargs) + case _: + raise ValueError(f"Unexpected token: {token} of type {type(token)}") + + def extended(self, more_transformers: dict[str, TokenTransformation]) -> TokenTransformer: + """Make a new TokenTransformer with additional transformations. New transformations take precedence over existing ones.""" + return TokenTransformer({**self.transformations, **more_transformers}, self.case_sensitive_tokens) + + +transformer = TokenTransformer( + { + e.name: e + for e in [ + TokenTransformation("NOT", logic.Not_, ["cond"]), + TokenTransformation("AND", logic.And_, "variadic"), + TokenTransformation("OR", logic.Or_, "variadic"), + TokenTransformation("HAS", logic.TagHas, ["k"]), + TokenTransformation("MATCH", logic.TagMatch, ["k", "v"]), + TokenTransformation("RE", logic.TagRematch, ["k", "v"]), + TokenTransformation("CONTAINS", logic.TagContains, ["k", "v"]), + ] + } +) diff --git a/alpacloud/eztag/parser_test.py b/alpacloud/eztag/parser_test.py new file mode 100644 index 0000000..836aa70 --- /dev/null +++ b/alpacloud/eztag/parser_test.py @@ -0,0 +1,273 @@ +"""Tests for parser module.""" + +from dataclasses import dataclass + +import pytest + +from alpacloud.eztag import logic +from alpacloud.eztag.logic import And_, TagMatch, TagRematch +from alpacloud.eztag.parser import FunctionCall, Parser, ParseState, RegexLiteral, StringLiteral, TokenTransformation, TokenTransformer, transformer + + +class TestParseState: + """Tests for ParseState helper class.""" + + def test_peek_at_beginning(self): + state = ParseState("hello") + assert state.peek() == "h" + assert state.pos == 0 # peek doesn't advance + + def test_peek_at_end(self): + state = ParseState("hi", pos=2) + assert state.peek() is None + + def test_consume_advances_position(self): + state = ParseState("hello") + assert state.consume() == "h" + assert state.pos == 1 + assert state.consume() == "e" + assert state.pos == 2 + + def test_consume_at_end_returns_none(self): + state = ParseState("a", pos=1) + assert state.consume() is None + + def test_consume_while_with_predicate(self): + state = ParseState("abc123") + result = state.consume_while(str.isalpha) + assert result == "abc" + assert state.pos == 3 + + def test_consume_while_returns_empty_on_no_match(self): + state = ParseState("123") + result = state.consume_while(str.isalpha) + assert result == "" + assert state.pos == 0 + + +class TestStringLiteral: + """Tests for StringLiteral parsing.""" + + def test_single_string_literal(self): + parser = Parser("func(arg1)") + result = parser.parse() + assert result == FunctionCall("func", [StringLiteral("arg1")]) + + def test_multiple_string_literals(self): + parser = Parser("func(arg1, arg2, arg3)") + result = parser.parse() + assert result == FunctionCall("func", [StringLiteral("arg1"), StringLiteral("arg2"), StringLiteral("arg3")]) + + def test_string_literal_with_numbers(self): + parser = Parser("func(arg123)") + result = parser.parse() + assert result == FunctionCall("func", [StringLiteral("arg123")]) + + def test_string_literal_with_underscore(self): + parser = Parser("func(my_arg)") + result = parser.parse() + assert result == FunctionCall("func", [StringLiteral("my_arg")]) + + def test_string_literal_with_spaces(self): + parser = Parser("func( arg1 )") + result = parser.parse() + assert result == FunctionCall("func", [StringLiteral("arg1")]) + + def test_mixed_string_literals_and_function_calls(self): + parser = Parser("func(arg1, nested(), arg2)") + result = parser.parse() + assert result == FunctionCall("func", [StringLiteral("arg1"), FunctionCall("nested", []), StringLiteral("arg2")]) + + def test_string_literal_in_nested_function(self): + parser = Parser("outer(inner(literal))") + result = parser.parse() + assert result == FunctionCall("outer", [FunctionCall("inner", [StringLiteral("literal")])]) + + def test_multiple_string_literals_nested(self): + parser = Parser("func(a, b(c, d), e)") + result = parser.parse() + assert result == FunctionCall("func", [StringLiteral("a"), FunctionCall("b", [StringLiteral("c"), StringLiteral("d")]), StringLiteral("e")]) + + +class TestParser: + """Tests for Parser class.""" + + def test_simple_function_no_args(self): + parser = Parser("func()") + result = parser.parse() + assert result == FunctionCall("func", []) + + def test_function_with_single_arg(self): + parser = Parser("func(arg1)") + result = parser.parse() + assert result == FunctionCall("func", [StringLiteral("arg1")]) + + def test_function_with_multiple_args(self): + parser = Parser("func(arg1, arg2, arg3)") + result = parser.parse() + assert result == FunctionCall("func", [StringLiteral("arg1"), StringLiteral("arg2"), StringLiteral("arg3")]) + + def test_function_with_numeric_args(self): + parser = Parser("add(123, 456)") + result = parser.parse() + assert result == FunctionCall("add", [StringLiteral("123"), StringLiteral("456")]) + + def test_function_with_nested_function_call(self): + parser = Parser("func(nested(inner), arg2)") + result = parser.parse() + expected = FunctionCall("func", [FunctionCall("nested", [StringLiteral("inner")]), StringLiteral("arg2")]) + assert result == expected + + def test_function_with_deeply_nested_function_calls(self): + parser = Parser("func(a(b(c)), d)") + result = parser.parse() + expected = FunctionCall("func", [FunctionCall("a", [FunctionCall("b", [StringLiteral("c")])]), StringLiteral("d")]) + assert result == expected + + def test_function_with_spaces(self): + parser = Parser("func( arg1 , arg2 )") + result = parser.parse() + assert result == FunctionCall("func", [StringLiteral("arg1"), StringLiteral("arg2")]) + + def test_function_with_leading_trailing_spaces(self): + parser = Parser(" func(arg) ") + result = parser.parse() + assert result == FunctionCall("func", [StringLiteral("arg")]) + + def test_function_with_underscore_in_name(self): + parser = Parser("my_func(arg)") + result = parser.parse() + assert result == FunctionCall("my_func", [StringLiteral("arg")]) + + def test_function_with_numbers_in_name(self): + parser = Parser("func123(arg)") + result = parser.parse() + assert result == FunctionCall("func123", [StringLiteral("arg")]) + + def test_empty_string_raises_error(self): + parser = Parser("") + with pytest.raises(ValueError, match="Expected function name"): + parser.parse() + + def test_missing_opening_paren_raises_error(self): + parser = Parser("func") + with pytest.raises(ValueError, match="Expected opening parenthesis"): + parser.parse() + + def test_missing_closing_paren_raises_error(self): + parser = Parser("func(arg") + with pytest.raises(ValueError, match="Expected.*closing parenthesis"): + parser.parse() + + def test_missing_comma_between_args_raises_error(self): + parser = Parser("func(arg1 arg2)") + with pytest.raises(ValueError, match="Expected comma or closing parenthesis"): + parser.parse() + + def test_function_with_empty_arg_between_commas(self): + parser = Parser("func(arg1, , arg2)") + with pytest.raises(ValueError, match="Expected identifier"): + parser.parse() + + +class TestParserNested: + """Tests for Parser class with nested function calls.""" + + def test_complex_nested_example(self): + parser = Parser("outer(inner1(a, b), inner2(c), d)") + result = parser.parse() + expected = FunctionCall("outer", [FunctionCall("inner1", [StringLiteral("a"), StringLiteral("b")]), FunctionCall("inner2", [StringLiteral("c")]), StringLiteral("d")]) + assert result == expected + + def test_function_with_special_chars_in_args(self): + parser = Parser("func(arg-1, arg.2, arg@3)") + result = parser.parse() + assert result == FunctionCall("func", [StringLiteral(value="arg-1"), StringLiteral(value="arg.2"), StringLiteral(value="arg@3")]) + + def test_multiple_nested_levels(self): + parser = Parser("f1(f2(f3(f4())), x)") + result = parser.parse() + expected = FunctionCall("f1", [FunctionCall("f2", [FunctionCall("f3", [FunctionCall("f4", [])])]), StringLiteral("x")]) + assert result == expected + + def test_nested_function_with_no_args(self): + parser = Parser("outer(inner())") + result = parser.parse() + expected = FunctionCall("outer", [FunctionCall("inner", [])]) + assert result == expected + + def test_multiple_nested_functions_as_args(self): + parser = Parser("func(a(), b(), c())") + result = parser.parse() + expected = FunctionCall("func", [FunctionCall("a", []), FunctionCall("b", []), FunctionCall("c", [])]) + assert result == expected + + def test_nested_with_mixed_args(self): + parser = Parser("outer(x, inner(y), z)") + result = parser.parse() + expected = FunctionCall("outer", [StringLiteral("x"), FunctionCall("inner", [StringLiteral("y")]), StringLiteral("z")]) + assert result == expected + + def test_deeply_nested_with_multiple_args(self): + parser = Parser("a(b(c(d, e), f), g)") + result = parser.parse() + expected = FunctionCall("a", [FunctionCall("b", [FunctionCall("c", [StringLiteral("d"), StringLiteral("e")]), StringLiteral("f")]), StringLiteral("g")]) + assert result == expected + + def test_string_literal_starts_with_number(self): + # Numbers at start mean it's not a valid identifier, so it's a plain arg + parser = Parser("func(123abc)") + result = parser.parse() + assert result == FunctionCall("func", [StringLiteral("123abc")]) + + def test_all_string_literals_in_complex_expr(self): + parser = Parser("and(or(a, b), not(c))") + result = parser.parse() + expected = FunctionCall("and", [FunctionCall("or", [StringLiteral("a"), StringLiteral("b")]), FunctionCall("not", [StringLiteral("c")])]) + assert result == expected + + def test_regex_literal(self): + parser = Parser("match(k, /v/)") + result = parser.parse() + assert result == FunctionCall("match", [StringLiteral("k"), RegexLiteral("v")]) + + def test_regex_literal_with_nesting(self): + parser = Parser("match(k, /match(k, v)/)") + result = parser.parse() + assert result == FunctionCall("match", [StringLiteral("k"), RegexLiteral("match(k, v)")]) + + +@dataclass +class FakeItem: + """Fake item for testing transformer.""" + + k: str + v: str = "default" + + +class TestTransformer: + """Tests for Transformer class.""" + + def test_simple_function(self): + assert transformer.transform(FunctionCall("NOT", [StringLiteral("x")])) == logic.Not_("x") + + def test_variadic_function(self): + assert transformer.transform(FunctionCall("AND", [StringLiteral("a"), StringLiteral("b"), StringLiteral("c")])) == logic.And_(["a", "b", "c"]) + + def test_multiple_args(self): + assert transformer.transform(FunctionCall("MATCH", [StringLiteral("k"), StringLiteral("v")])) == logic.TagMatch("k", "v") + + def test_multiple_with_default(self): + assert TokenTransformer({"TEST": TokenTransformation("TEST", FakeItem, ["k", "v"])}).transform(FunctionCall("TEST", [StringLiteral("k")])) == FakeItem("k") + + def test_recursive(self): + assert transformer.transform(FunctionCall("AND", [FunctionCall("NOT", [StringLiteral("x")])])) == logic.And_([logic.Not_("x")]) + + +class TestIntegration: + """Integration tests for Parser and Transformer.""" + + def test_example(self): + filter_str = "and(match(env, prd), re(name, /grafana.*/))" + result = transformer.transform(Parser(filter_str).parse()) + assert result == And_(conds=[TagMatch(k="env", v="prd"), TagRematch(k="name", v="grafana.*")]) diff --git a/alpacloud/eztag/readme.md b/alpacloud/eztag/readme.md new file mode 100644 index 0000000..ced4306 --- /dev/null +++ b/alpacloud/eztag/readme.md @@ -0,0 +1,127 @@ +# alpacloud.eztag + +`eztag` helps you easily filter things by tags. + +## Usage + +### Filtering in code + +You can directly invoke the filters as functions. For example: + +```python +from alpacloud.eztag.tag import TagSet + +@dataclass +class Snippet: + name: str + content: str + tags: TagSet +``` + +you can filter them with convenient syntax: + +```python +filter(lambda s: s.tags.has("python"), snippets) +``` + +There are several filter functions available: + +- has : check if a TagSet has a tag +- match : check if a TagSet has a key with a value +- rematch : check if a TagSet has a key with a value that matches a regex +- contains : check if a TagSet has a key whose value contains a substring + +### Filtering external data + +If your objects don't have tags, you can associate tag data with them using `Selector`: + +```python +from alpacloud.eztag.logic import TagMatch +from alpacloud.eztag.selector import Selector +from alpacloud.eztag.tag import TagSet + +tasks = Selector([ + (TagSet.from_dict({"env":"prd", "dangerous": "true"}), task0), + (TagSet.from_dict({"env":"stg", "dangerous": "false"}), task1), +]) + +dangerous_tasks = tasks.select(TagMatch("dangerous", "true")) +``` + +### Filtering from the CLI + +`eztag` allows you to input filters in a simple language from the CLI. You can build this filtering into your own tools. + + +```python +import click + +from alpacloud.eztag.parser import Parser, transformer +from alpacloud.eztag.selector import Selector + +tagged_tasks = Selector(...) + +@click.command() +@click.argument("filter") +def cli(filter): + expr = transformer.transform(Parser(filter).parse()) + selected_tasks = tagged_tasks.select(expr) + + for task in selected_tasks: + task.run() +``` + +Then you can invoke it like this: +```shell +task-run --filter 'and(match(env, prd), re(name, /cert.*/)' +``` + +## CLI filter usage + +Filter syntax is: +``` +regex_literal := "/" regex "/" +string_literal := any characters except "(),/" and spaces +expr := identifier(expr [, expr])* | regex_literal | string_literal +identifier := "and" | "or" | "not" | "has" | "match" | "re" | "contains" +``` + +the operations are: +- `and` : logical AND +- `or` : logical OR +- `not` : logical NOT +- `has` : check if a tag has a key +- `match` : check if a tag has a key with a value +- `re` : check if a tag has a key with a value that matches a regex +- `contains` : check if a tag has a key whose value contains a substring + +## Advanced usage + +### Adding custom filters or operators + +You can add your own filters or operators to streamline your usecase. For example, if you shard your tasks, you can add a filter to run only on a specific shard and then filter with `AND(MATCH(env, prd), SHARD(5))` for example. + +1. Implement your filter as a subclass of `alpacloud.eztag.logic.Expr` + + ```python + from dataclasses import dataclass + from alpacloud.eztag.logic import Expr + + @dataclass(frozen=True) + class Shard(Expr): + """Run tasks with this shard identifier""" + shard: int + + def check(self, tags) -> bool: + return tags.contains("shard", str(self.shard)) + ``` + +2. Register your filter in the `transformer` dictionary: + + ```python + from alpacloud.eztag.transformer import transformer, TokenTransformer, TokenTransformation + + my_transformer = transformer.extended({ + "SHARD": TokenTransformation("SHARD", Shard, args=["shard"]), + }) + ``` diff --git a/alpacloud/eztag/selector.py b/alpacloud/eztag/selector.py new file mode 100644 index 0000000..bcce757 --- /dev/null +++ b/alpacloud/eztag/selector.py @@ -0,0 +1,23 @@ +"""Selector items based on their tags. Useful for when tags are external to the data.""" + +from typing import Generic, TypeVar + +from alpacloud.eztag.logic import Expr +from alpacloud.eztag.tag import TagSet + +Data = TypeVar("Data") + + +class Selector(Generic[Data]): + """Select items based on their tags""" + + def __init__(self, items: list[tuple[TagSet, Data]]): + self.items = items + + def select(self, expr: Expr) -> list[Data]: + """Select items based on a predicate""" + return [e[1] for e in self.select_with_tags(expr)] + + def select_with_tags(self, expr: Expr) -> list[tuple[TagSet, Data]]: + """Select items based on a predicate, returning their tags as well""" + return [e for e in self.items if expr.check(e[0])] diff --git a/alpacloud/eztag/tag.py b/alpacloud/eztag/tag.py new file mode 100644 index 0000000..fc7072b --- /dev/null +++ b/alpacloud/eztag/tag.py @@ -0,0 +1,53 @@ +"""TagSet implementation, includes operations on finding tags matching criteria""" + +from __future__ import annotations + +import re +from dataclasses import dataclass +from typing import Iterable + +from alpacloud.eztag.multidict import K, MultiDict, V + + +@dataclass +class TagSet: + """A set of tags""" + + ts: MultiDict + + @classmethod + def from_dict(cls, d: dict[K, V]) -> TagSet: + """Create a tagset from a dict of key-value pairs""" + return cls(MultiDict.from_dict(d)) + + @classmethod + def create(cls, d: dict[K, Iterable[V] | V]) -> TagSet: + """Create a tagset from a dict of key-value pairs or key-list of values pairs""" + return cls(MultiDict.create(d)) + + def has(self, k: str) -> bool: + """Check if the key exists in the tagset""" + return k in self.ts + + def match(self, k: str, v: str | None) -> bool: + """Exact match the value for this key (returns True if any value matches)""" + if not self.has(k): + return False + return v in self.ts[k] + + def rematch(self, k: str, v: str | re.Pattern) -> bool: + """Regex match the value for this key (returns True if any value matches)""" + if isinstance(v, str): + v = re.compile(v) + + if not self.has(k): + return False + + return any(val is not None and v.fullmatch(val) is not None for val in self.ts[k]) + + def contains(self, k: str, v: str) -> bool: + """Check if any value for this key contains the substring""" + if not self.has(k): + return False + + return any(val is not None and v in val for val in self.ts[k]) diff --git a/alpacloud/eztag/tag_test.py b/alpacloud/eztag/tag_test.py new file mode 100644 index 0000000..e0f5669 --- /dev/null +++ b/alpacloud/eztag/tag_test.py @@ -0,0 +1,339 @@ +"""Tests for TagSet class""" + +from __future__ import annotations + +import re + +from alpacloud.eztag.multidict import MultiDict +from alpacloud.eztag.tag import TagSet + + +class TestTagSetHas: + """Tests for TagSet.has() method""" + + def test_has_returns_true_when_key_exists(self): + tagset = TagSet(ts=MultiDict.create({"env": "prod", "region": "us-east-1"})) + assert tagset.has("env") is True + + def test_has_returns_false_when_key_does_not_exist(self): + tagset = TagSet(ts=MultiDict.create({"env": "prod"})) + assert tagset.has("region") is False + + def test_has_returns_true_when_key_exists_with_none_value(self): + tagset = TagSet(ts=MultiDict.create({"env": None})) + assert tagset.has("env") is True + + def test_has_with_empty_tagset(self): + tagset = TagSet(ts=MultiDict.create({})) + assert tagset.has("any_key") is False + + def test_has_with_empty_string_key(self): + tagset = TagSet(ts=MultiDict.create({"": "value"})) + assert tagset.has("") is True + + def test_has_with_multiple_values_for_key(self): + tagset = TagSet(ts=MultiDict.create({"env": ["prod", "staging"]})) + assert tagset.has("env") is True + + +class TestTagSetMatch: + """Tests for TagSet.match() method""" + + def test_match_returns_true_when_key_and_value_match(self): + tagset = TagSet(ts=MultiDict.create({"env": "prod"})) + assert tagset.match("env", "prod") is True + + def test_match_returns_false_when_value_does_not_match(self): + tagset = TagSet(ts=MultiDict.create({"env": "prod"})) + assert tagset.match("env", "dev") is False + + def test_match_returns_false_when_key_does_not_exist(self): + tagset = TagSet(ts=MultiDict.create({"env": "prod"})) + assert tagset.match("region", "us-east-1") is False + + def test_match_with_none_value(self): + tagset = TagSet(ts=MultiDict.create({"env": None})) + assert tagset.match("env", None) is True + + def test_match_none_value_against_string_returns_false(self): + tagset = TagSet(ts=MultiDict.create({"env": None})) + assert tagset.match("env", "prod") is False + + def test_match_string_value_against_none_returns_false(self): + tagset = TagSet(ts=MultiDict.create({"env": "prod"})) + assert tagset.match("env", None) is False + + def test_match_with_empty_string_value(self): + tagset = TagSet(ts=MultiDict.create({"env": ""})) + assert tagset.match("env", "") is True + + def test_match_case_sensitive(self): + tagset = TagSet(ts=MultiDict.create({"env": "Prod"})) + assert tagset.match("env", "prod") is False + assert tagset.match("env", "Prod") is True + + def test_match_with_multiple_values_matches_any(self): + tagset = TagSet(ts=MultiDict.create({"env": ["prod", "staging", "dev"]})) + assert tagset.match("env", "prod") is True + assert tagset.match("env", "staging") is True + assert tagset.match("env", "dev") is True + assert tagset.match("env", "test") is False + + def test_match_with_multiple_values_including_none(self): + tagset = TagSet(ts=MultiDict.create({"env": ["prod", None]})) + assert tagset.match("env", "prod") is True + assert tagset.match("env", None) is True + assert tagset.match("env", "dev") is False + + +class TestTagSetRematch: + """Tests for TagSet.rematch() method""" + + def test_rematch_with_string_pattern_matches(self): + tagset = TagSet(ts=MultiDict.create({"env": "prod"})) + assert tagset.rematch("env", "prod") is True + + def test_rematch_with_string_pattern_does_not_match(self): + tagset = TagSet(ts=MultiDict.create({"env": "prod"})) + assert tagset.rematch("env", "dev") is False + + def test_rematch_with_regex_pattern_matches(self): + tagset = TagSet(ts=MultiDict.create({"env": "prod-01"})) + assert tagset.rematch("env", r"prod-\d+") is True + + def test_rematch_with_regex_pattern_does_not_match(self): + tagset = TagSet(ts=MultiDict.create({"env": "prod"})) + assert tagset.rematch("env", r"prod-\d+") is False + + def test_rematch_with_compiled_pattern_matches(self): + tagset = TagSet(ts=MultiDict.create({"env": "prod-01"})) + pattern = re.compile(r"prod-\d+") + assert tagset.rematch("env", pattern) is True + + def test_rematch_with_compiled_pattern_does_not_match(self): + tagset = TagSet(ts=MultiDict.create({"env": "prod"})) + pattern = re.compile(r"prod-\d+") + assert tagset.rematch("env", pattern) is False + + def test_rematch_returns_false_when_key_does_not_exist(self): + tagset = TagSet(ts=MultiDict.create({"env": "prod"})) + assert tagset.rematch("region", "us-.*") is False + + def test_rematch_with_wildcard_pattern(self): + tagset = TagSet(ts=MultiDict.create({"env": "production"})) + assert tagset.rematch("env", "prod.*") is True + + def test_rematch_with_alternation_pattern(self): + tagset = TagSet(ts=MultiDict.create({"env": "staging"})) + assert tagset.rematch("env", "prod|staging|dev") is True + + def test_rematch_requires_full_match(self): + tagset = TagSet(ts=MultiDict.create({"env": "my-prod-env"})) + # Should not match because rematch uses fullmatch (not partial match) + assert tagset.rematch("env", "prod") is False + assert tagset.rematch("env", ".*prod.*") is True + + def test_rematch_with_empty_string_pattern(self): + tagset = TagSet(ts=MultiDict.create({"env": ""})) + assert tagset.rematch("env", "") is True + + def test_rematch_with_special_regex_characters(self): + tagset = TagSet(ts=MultiDict.create({"version": "1.2.3"})) + # Without escaping, '.' matches any character + assert tagset.rematch("version", r"1.2.3") is True + # With proper escaping + assert tagset.rematch("version", r"1\.2\.3") is True + + def test_rematch_case_sensitive_by_default(self): + tagset = TagSet(ts=MultiDict.create({"env": "Prod"})) + assert tagset.rematch("env", "prod") is False + assert tagset.rematch("env", "Prod") is True + + def test_rematch_with_case_insensitive_pattern(self): + tagset = TagSet(ts=MultiDict.create({"env": "Prod"})) + pattern = re.compile("prod", re.IGNORECASE) + assert tagset.rematch("env", pattern) is True + + def test_rematch_with_none_value_returns_false(self): + tagset = TagSet(ts=MultiDict.create({"env": None})) + # None values should be skipped + assert tagset.rematch("env", "prod") is False + + def test_rematch_with_multiple_values_matches_any(self): + tagset = TagSet(ts=MultiDict.create({"env": ["prod-01", "staging-02", "dev-03"]})) + assert tagset.rematch("env", r"prod-\d+") is True + assert tagset.rematch("env", r"staging-\d+") is True + assert tagset.rematch("env", r"dev-\d+") is True + assert tagset.rematch("env", r"test-\d+") is False + + def test_rematch_with_multiple_values_one_matches(self): + tagset = TagSet(ts=MultiDict.create({"env": ["production", "prod-01", "my-env"]})) + # Should match because at least one value matches + assert tagset.rematch("env", r"prod-\d+") is True + + def test_rematch_with_multiple_values_including_none(self): + tagset = TagSet(ts=MultiDict.create({"env": ["prod-01", None, "staging"]})) + # Should skip None and still find matches + assert tagset.rematch("env", r"prod-\d+") is True + assert tagset.rematch("env", "staging") is True + + +class TestTagSetContains: + """Tests for TagSet.contains() method""" + + def test_contains_returns_true_when_substring_exists(self): + tagset = TagSet(ts=MultiDict.create({"env": "production"})) + assert tagset.contains("env", "prod") is True + + def test_contains_returns_false_when_substring_does_not_exist(self): + tagset = TagSet(ts=MultiDict.create({"env": "production"})) + assert tagset.contains("env", "dev") is False + + def test_contains_returns_false_when_key_does_not_exist(self): + tagset = TagSet(ts=MultiDict.create({"env": "production"})) + assert tagset.contains("region", "us") is False + + def test_contains_with_exact_match(self): + tagset = TagSet(ts=MultiDict.create({"env": "prod"})) + assert tagset.contains("env", "prod") is True + + def test_contains_with_empty_substring(self): + tagset = TagSet(ts=MultiDict.create({"env": "prod"})) + # Empty string is contained in any string + assert tagset.contains("env", "") is True + + def test_contains_with_empty_string_value(self): + tagset = TagSet(ts=MultiDict.create({"env": ""})) + assert tagset.contains("env", "") is True + assert tagset.contains("env", "anything") is False + + def test_contains_with_none_value_returns_false(self): + tagset = TagSet(ts=MultiDict.create({"env": None})) + assert tagset.contains("env", "prod") is False + + def test_contains_case_sensitive(self): + tagset = TagSet(ts=MultiDict.create({"env": "Production"})) + assert tagset.contains("env", "Prod") is True + assert tagset.contains("env", "prod") is False + + def test_contains_with_substring_at_start(self): + tagset = TagSet(ts=MultiDict.create({"env": "production-east"})) + assert tagset.contains("env", "prod") is True + + def test_contains_with_substring_at_end(self): + tagset = TagSet(ts=MultiDict.create({"env": "my-prod"})) + assert tagset.contains("env", "prod") is True + + def test_contains_with_substring_in_middle(self): + tagset = TagSet(ts=MultiDict.create({"env": "my-prod-env"})) + assert tagset.contains("env", "prod") is True + + def test_contains_with_multiple_occurrences(self): + tagset = TagSet(ts=MultiDict.create({"env": "prod-prod-prod"})) + assert tagset.contains("env", "prod") is True + + def test_contains_with_special_characters(self): + tagset = TagSet(ts=MultiDict.create({"version": "v1.2.3-beta"})) + assert tagset.contains("version", "1.2") is True + assert tagset.contains("version", "-beta") is True + assert tagset.contains("version", ".") is True + + def test_contains_with_whitespace(self): + tagset = TagSet(ts=MultiDict.create({"description": "prod environment"})) + assert tagset.contains("description", "prod env") is True + assert tagset.contains("description", " ") is True + + def test_contains_does_not_treat_substring_as_regex(self): + tagset = TagSet(ts=MultiDict.create({"env": "prod123"})) + # The substring is literal, not a regex pattern + assert tagset.contains("env", r"\d+") is False + assert tagset.contains("env", "prod") is True + assert tagset.contains("env", "123") is True + + def test_contains_with_multiple_values_matches_any(self): + tagset = TagSet(ts=MultiDict.create({"env": ["production", "staging", "development"]})) + assert tagset.contains("env", "prod") is True + assert tagset.contains("env", "stag") is True + assert tagset.contains("env", "dev") is True + assert tagset.contains("env", "test") is False + + def test_contains_with_multiple_values_one_matches(self): + tagset = TagSet(ts=MultiDict.create({"env": ["my-env", "production", "other"]})) + # Should match because at least one value contains the substring + assert tagset.contains("env", "prod") is True + + def test_contains_with_multiple_values_including_none(self): + tagset = TagSet(ts=MultiDict.create({"env": ["production", None, "staging"]})) + # Should skip None and still find matches + assert tagset.contains("env", "prod") is True + assert tagset.contains("env", "stag") is True + + +class TestTagSetIntegration: + """Integration tests for TagSet""" + + def test_tagset_with_multiple_operations(self): + tagset = TagSet(ts=MultiDict.create({"env": "prod", "region": "us-east-1", "version": "1.2.3", "team": "platform"})) + + assert tagset.has("env") + assert tagset.match("env", "prod") + assert tagset.rematch("region", r"us-.*") + assert tagset.rematch("version", r"\d+\.\d+\.\d+") + assert tagset.contains("region", "east") + assert tagset.contains("team", "plat") + assert not tagset.has("missing_key") + + def test_tagset_empty_initialization(self): + tagset = TagSet(ts=MultiDict.create({})) + assert not tagset.has("any_key") + + def test_contains_vs_match_vs_rematch(self): + tagset = TagSet(ts=MultiDict.create({"env": "my-prod-environment"})) + + # match requires exact equality + assert not tagset.match("env", "prod") + assert tagset.match("env", "my-prod-environment") + + # contains checks for substring + assert tagset.contains("env", "prod") + assert tagset.contains("env", "environment") + + # rematch requires full regex match + assert not tagset.rematch("env", "prod") + assert tagset.rematch("env", r".*prod.*") + assert tagset.rematch("env", r"my-\w+-environment") + + def test_tagset_with_multiple_values_per_key(self): + tagset = TagSet(ts=MultiDict.create({"env": ["prod", "staging"], "region": ["us-east-1", "us-west-2"]})) + + # has() checks key existence + assert tagset.has("env") + assert tagset.has("region") + + # match() returns True if ANY value matches + assert tagset.match("env", "prod") + assert tagset.match("env", "staging") + assert not tagset.match("env", "dev") + + # rematch() returns True if ANY value matches + assert tagset.rematch("region", r"us-.*") + assert tagset.rematch("region", r".*east.*") + assert tagset.rematch("region", r".*west.*") + + # contains() returns True if ANY value contains substring + assert tagset.contains("env", "prod") + assert tagset.contains("env", "stag") + assert tagset.contains("region", "east") + assert tagset.contains("region", "west") + + def test_mixed_none_and_string_values(self): + tagset = TagSet(ts=MultiDict.create({"env": ["prod", None, "staging"]})) + + assert tagset.has("env") + assert tagset.match("env", "prod") + assert tagset.match("env", None) + assert tagset.match("env", "staging") + + # rematch and contains should skip None values + assert tagset.rematch("env", "prod") + assert tagset.contains("env", "prod")