Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -46,33 +46,33 @@ reset-kafka: setup ## Reset kafka
.PHONY: reset-kafka

test-rebalance: build reset-kafka ## Run the rebalance integration test
python -m pytest python/integration_tests/test_consumer_rebalancing.py -s
rm -r python/integration_tests/.tests_output/test_consumer_rebalancing
python -m pytest integration_tests/test_consumer_rebalancing.py -s
rm -r integration_tests/.tests_output/test_consumer_rebalancing
.PHONY: test-rebalance

test-worker-processing: build reset-kafka ## Run the worker processing integration test
python -m pytest python/integration_tests/test_task_worker_processing.py -s
rm -r python/integration_tests/.tests_output/test_task_worker_processing
python -m pytest integration_tests/test_task_worker_processing.py -s
rm -r integration_tests/.tests_output/test_task_worker_processing
.PHONY: test-worker-processing

test-upkeep-retry: build reset-kafka ## Run the upkeep retry integration test
python -m pytest python/integration_tests/test_upkeep_retry.py -s
rm -r python/integration_tests/.tests_output/test_upkeep_retry
python -m pytest integration_tests/test_upkeep_retry.py -s
rm -r integration_tests/.tests_output/test_upkeep_retry
.PHONY: test-upkeep-retry

test-upkeep-expiry: build reset-kafka ## Run the upkeep expiry integration test
python -m pytest python/integration_tests/test_upkeep_expiry.py -s
rm -r python/integration_tests/.tests_output/test_upkeep_expiry
python -m pytest integration_tests/test_upkeep_expiry.py -s
rm -r integration_tests/.tests_output/test_upkeep_expiry
.PHONY: test-upkeep-expiry

test-upkeep-delay: build reset-kafka ## Run the upkeep delay integration test
python -m pytest python/integration_tests/test_upkeep_delay.py -s
rm -r python/integration_tests/.tests_output/test_upkeep_delay
python -m pytest integration_tests/test_upkeep_delay.py -s
rm -r integration_tests/.tests_output/test_upkeep_delay
.PHONY: test-upkeep-delay

test-failed-tasks: build reset-kafka ## Run the failed tasks integration test
python -m pytest python/integration_tests/test_failed_tasks.py -s
rm -r python/integration_tests/.tests_output/test_failed_tasks
python -m pytest integration_tests/test_failed_tasks.py -s
rm -r integration_tests/.tests_output/test_failed_tasks
.PHONY: test-failed-tasks

integration-test: test-rebalance test-worker-processing test-upkeep-retry test-upkeep-expiry test-upkeep-delay test-failed-tasks ## Run all integration tests
Expand Down
1 change: 1 addition & 0 deletions clients/python/.python-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.12.11
File renamed without changes.
82 changes: 82 additions & 0 deletions clients/python/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
[project]
name = "taskbroker-client"
version = "0.1.0"
description = "Taskbroker python client and worker runtime"
readme = "README.md"
dependencies = [
"sentry-arroyo>=2.33.1",
"sentry-sdk[http2]>=2.43.0",
"sentry-protos>=0.2.0",
"confluent_kafka>=2.3.0",
"cronsim>=2.6",
"grpcio==1.66.1",
"orjson>=3.10.10",
"protobuf>=5.28.3",
"types-protobuf>=6.30.2.20250703",
"redis>=3.4.1",
"redis-py-cluster>=2.1.0",
"zstandard>=0.18.0",
]

[dependency-groups]
dev = [
"devservices>=1.2.1",
"sentry-devenv>=1.22.2",
"black==24.10.0",
"pre-commit>=4.2.0",
"pytest>=8.3.3",
"flake8>=7.3.0",
"isort>=5.13.2",
"mypy>=1.17.1",
"time-machine>=2.16.0",
]

[build-system]
requires = ["uv_build>=0.8.2,<0.9.0"]
build-backend = "uv_build"

[tool.uv]
environments = ["sys_platform == 'darwin' or sys_platform == 'linux'"]

[[tool.uv.index]]
url = "https://pypi.devinfra.sentry.io/simple"
default = true

[tool.pytest.ini_options]
pythonpath = ["python"]
testpaths = ["tests"]
python_files = ["test_*.py"]
python_functions = ["test_*"]

[tool.mypy]
mypy_path = "python"
explicit_package_bases = true
# minimal strictness settings
check_untyped_defs = true
no_implicit_reexport = true
warn_unreachable = true
warn_unused_configs = true
warn_unused_ignores = true
warn_redundant_casts = true
enable_error_code = ["ignore-without-code", "redundant-self"]
local_partial_types = true # compat with dmypy
disallow_any_generics = true
disallow_untyped_defs = true

# begin: missing 3rd party stubs
[[tool.mypy.overrides]]
module = [
"confluent_kafka.*",
]
ignore_missing_imports = true
# end: missing 3rd party stubs

[tool.black]
# File filtering is taken care of in pre-commit.
line-length = 100
target-version = ['py311']

[tool.isort]
profile = "black"
line_length = 100
lines_between_sections = 1
91 changes: 91 additions & 0 deletions clients/python/src/examples/example_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
"""
Example taskbroker application with tasks

Used in tests for the worker.
"""

import logging
from time import sleep
from typing import Any

from arroyo.backends.kafka import KafkaProducer
from redis import StrictRedis

from examples.store import StubAtMostOnce
from taskbroker_client.app import TaskbrokerApp
from taskbroker_client.retry import LastAction, NoRetriesRemainingError, Retry, RetryTaskError
from taskbroker_client.retry import retry_task as retry_task_helper

logger = logging.getLogger(__name__)


def producer_factory(topic: str) -> KafkaProducer:
# TODO use env vars for kafka host/port
config = {
"bootstrap.servers": "127.0.0.1:9092",
"compression.type": "lz4",
"message.max.bytes": 50000000, # 50MB
}
return KafkaProducer(config)


app = TaskbrokerApp(
producer_factory=producer_factory,
at_most_once_store=StubAtMostOnce(),
)

# Create a namespace and register tasks
exampletasks = app.taskregistry.create_namespace("examples")


@exampletasks.register(name="examples.simple_task")
def simple_task(*args: list[Any], **kwargs: dict[str, Any]) -> None:
sleep(0.1)
logger.debug("simple_task complete")


@exampletasks.register(name="examples.retry_task", retry=Retry(times=2))
def retry_task() -> None:
raise RetryTaskError


@exampletasks.register(name="examples.fail_task")
def fail_task() -> None:
raise ValueError("nope")


@exampletasks.register(name="examples.at_most_once", at_most_once=True)
def at_most_once_task() -> None:
pass


@exampletasks.register(
name="examples.retry_state", retry=Retry(times=2, times_exceeded=LastAction.Deadletter)
)
def retry_state() -> None:
try:
retry_task_helper()
except NoRetriesRemainingError:
# TODO read host from env vars
redis = StrictRedis(host="localhost", port=6379, decode_responses=True)
redis.set("no-retries-remaining", 1)


@exampletasks.register(
name="examples.will_retry",
retry=Retry(times=3, on=(RuntimeError,), times_exceeded=LastAction.Discard),
)
def will_retry(failure: str) -> None:
if failure == "retry":
logger.debug("going to retry with explicit retry error")
raise RetryTaskError
if failure == "raise":
logger.debug("raising runtimeerror")
raise RuntimeError("oh no")
logger.debug("got %s", failure)


@exampletasks.register(name="examples.timed")
def timed_task(sleep_seconds: float | str, *args: list[Any], **kwargs: dict[str, Any]) -> None:
sleep(float(sleep_seconds))
logger.debug("timed_task complete")
12 changes: 12 additions & 0 deletions clients/python/src/examples/store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from taskbroker_client.types import AtMostOnceStore


class StubAtMostOnce(AtMostOnceStore):
def __init__(self) -> None:
self._keys: dict[str, str] = {}

def add(self, key: str, value: str, timeout: int) -> bool:
if key in self._keys:
return False
self._keys[key] = value
return True
Empty file.
112 changes: 112 additions & 0 deletions clients/python/src/taskbroker_client/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import importlib
from collections.abc import Iterable
from typing import Any

from sentry_protos.taskbroker.v1.taskbroker_pb2 import TaskActivation

from taskbroker_client.imports import import_string
from taskbroker_client.metrics import MetricsBackend
from taskbroker_client.registry import TaskRegistry
from taskbroker_client.router import TaskRouter
from taskbroker_client.types import AtMostOnceStore, ProducerFactory


class TaskbrokerApp:
"""
Container for an application's task setup and configuration.
"""

def __init__(
self,
producer_factory: ProducerFactory,
router_class: str | TaskRouter = "taskbroker_client.router.DefaultRouter",
metrics_class: str | MetricsBackend = "taskbroker_client.metrics.NoOpMetricsBackend",
at_most_once_store: AtMostOnceStore | None = None,
) -> None:
self.metrics = self._build_metrics(metrics_class)
self._config = {
"rpc_secret": None,
"grpc_config": None,
"at_most_once_timeout": None,
}
self._modules: Iterable[str] = []
self._taskregistry = TaskRegistry(
producer_factory=producer_factory,
router=self._build_router(router_class),
metrics=self.metrics,
)
self.at_most_once_store(at_most_once_store)

def _build_router(self, router_name: str | TaskRouter) -> TaskRouter:
if isinstance(router_name, str):
router_class = import_string(router_name)
router = router_class()
else:
router = router_name
assert hasattr(router, "route_namespace")

return router

def _build_metrics(self, backend_name: str | MetricsBackend) -> MetricsBackend:
if isinstance(backend_name, str):
metrics_class = import_string(backend_name)
return metrics_class()
return backend_name

@property
def taskregistry(self) -> TaskRegistry:
"""Get the TaskRegistry instance from this app"""
return self._taskregistry

@property
def config(self) -> dict[str, Any]:
"""Get the config data"""
return self._config

def set_config(self, config: dict[str, Any]) -> None:
"""Update configuration data"""
for key, value in config.items():
if key in self._config:
self._config[key] = value

def set_modules(self, modules: Iterable[str]) -> None:
"""
Set the list of modules containing tasks to be loaded by workers and schedulers.
"""
self._modules = modules

def load_modules(self) -> None:
"""Load all of the configured modules"""
for mod in self._modules:
__import__(mod)

def at_most_once_store(self, backend: AtMostOnceStore | None) -> None:
"""
Set the backend store for `at_most_once` tasks.
The storage implementation should support atomic operations
to avoid races with at_most_once tasks.
"""
self._at_most_once_store = backend

def should_attempt_at_most_once(self, activation: TaskActivation) -> bool:
if not self._at_most_once_store:
return True
key = get_at_most_once_key(activation.namespace, activation.taskname, activation.id)
return self._at_most_once_store.add(
key, "1", timeout=self._config["at_most_once_timeout"] or 60
)


def get_at_most_once_key(namespace: str, taskname: str, task_id: str) -> str:
# tw:amo -> taskworker:at_most_once
return f"tw:amo:{namespace}:{taskname}:{task_id}"


def import_app(app_module: str) -> TaskbrokerApp:
"""
Resolve an application path like `acme.worker.runtime:app`
into the `app` symbol defined in the module.
"""
module_name, name = app_module.split(":")
module = importlib.import_module(module_name)
return getattr(module, name)
Loading
Loading