Skip to content

add distributed budget enforcement with Redis-backed tracker #131

@cchinchilla-dev

Description

@cchinchilla-dev

Description

WorkflowEngine._budget_spent (src/agentloom/core/engine.py:79) is a per-engine, per-process counter. BudgetExceededError fires when this counter exceeds WorkflowConfig.budget_usd. This works for single-process CLI runs but has no notion of cross-process or cross-pod aggregation.

In any distributed deployment (K8s with N pods running the same evaluation suite, AgentTest spinning up parallel evaluation jobs, batch runners across multiple workers), the actual total cost is N × budget_per_workflow — the budget is enforced locally, not globally. A test session configured at budget_usd: 10.00 running across 20 parallel pods can spend up to $200 before any single pod exceeds its limit. There is no "stop everything when collective spend hits $X" mechanism.

This is the budget-side analog of #69 (distributed rate limiting). #69 prevents N pods from collectively exceeding the provider's RPM/TPM; this issue prevents N pods from collectively exceeding the user's budget.

Proposal

Add a pluggable BudgetTracker abstraction with a Redis-backed implementation, mirroring the architecture proposed in #69 for rate limiting.

1. Interface:

# src/agentloom/resilience/budget.py
class BudgetTracker(Protocol):
    """Tracks cumulative cost against a budget with optional distributed coordination."""

    async def reserve(self, scope: str, amount_usd: float) -> ReservationToken:
        """Reserve a pre-flight cost estimate. Raises BudgetExceededError if exceeds.
        Returns token for later commit/release."""

    async def commit(self, token: ReservationToken, actual_usd: float) -> None:
        """Commit the actual cost — adjusts reserved → actual."""

    async def release(self, token: ReservationToken) -> None:
        """Release the reservation without committing (call failed before charge)."""

    async def get_spent(self, scope: str) -> float:
        """Current total spend for a scope."""

    async def get_budget(self, scope: str) -> float | None:
        """Configured budget for a scope, if any."""

Two reference implementations:

  • LocalBudgetTracker — in-process dict; current behavior, default.
  • RedisBudgetTracker — atomic INCR/DECR on Redis keys; cross-pod global state.

2. Scoping:

A "scope" is a hierarchical identifier:

session:eval-2026-04-12          # whole evaluation session, max $50
  workflow:scenario-001          # this workflow run, max $5
  workflow:scenario-002          # this workflow run, max $5
  ...

Multiple scopes apply per call; the call must fit within ALL of them. The CLI / AgentTest creates the session scope at startup; each workflow inherits and creates its own.

config:
  budget_usd: 5.00              # workflow-level, existing
  budget_scopes:                # NEW: additional scopes
    - name: session
      key: "{env.AGENTLOOM_SESSION_ID}"
      budget_usd: 50.00

3. Pre-flight reservation:

Before each LLM call, estimate cost (per #12 pre-flight estimation):

estimated_cost = pricing.estimate(model, estimated_input_tokens, estimated_output_tokens)
token = await budget_tracker.reserve(scope=workflow_run_id, amount_usd=estimated_cost)
try:
    response = await gateway.complete(...)
    await budget_tracker.commit(token, actual_usd=response.cost_usd)
except Exception:
    await budget_tracker.release(token)
    raise

This eliminates the post-hoc overshoot tracked in #108: parallel steps can't all start when there's no headroom because each must reserve before dispatching.

4. Redis backend semantics:

Key: agentloom:budget:spent:<scope>           (string, atomic INCR with float)
Key: agentloom:budget:reserved:<scope>        (sorted set: token → amount)
Key: agentloom:budget:limit:<scope>           (string, configured limit)

reserve(scope, amount):
  current = INCRBY agentloom:budget:reserved:<scope> amount
  spent = GET agentloom:budget:spent:<scope>
  limit = GET agentloom:budget:limit:<scope>
  if (spent + current) > limit: rollback INCRBY, raise
  return token

commit(token, actual):
  ZREM agentloom:budget:reserved:<scope> token
  INCRBYFLOAT agentloom:budget:spent:<scope> actual

Use Lua scripts for atomicity (check + increment in one round-trip).

5. CLI:

agentloom run workflow.yaml \
  --budget-tracker redis \
  --budget-redis-url redis://localhost:6379 \
  --budget-session-id eval-2026-04-12 \
  --budget-session-limit 50.00
# Inspect ongoing budget state
agentloom budget status --session eval-2026-04-12
# Spent: $32.40 / $50.00 (64.8%)
# Reserved: $1.20
# Per-workflow breakdown:
#   scenario-001: $4.50
#   scenario-002: $4.20
#   ...

6. Observability:

  • Existing agentloom_budget_remaining_usd{workflow} gauge extended with scope dimension.
  • New gauge agentloom_budget_reserved_usd{scope} — outstanding reservations.
  • Counter agentloom_budget_rejections_total{scope, reason}.

Scope

  • src/agentloom/resilience/budget.pyBudgetTracker Protocol, LocalBudgetTracker, RedisBudgetTracker.
  • src/agentloom/core/engine.py — replace inline _budget_spent with BudgetTracker calls; reserve before dispatch, commit after.
  • src/agentloom/core/models.pyWorkflowConfig.budget_scopes.
  • src/agentloom/cli/run.py--budget-tracker, --budget-redis-url, --budget-session-id, --budget-session-limit flags.
  • src/agentloom/cli/budget.py — new agentloom budget status command.
  • src/agentloom/observability/metrics.py — new metrics with scope label.
  • pyproject.tomlredis extra (shared with add shared rate limiting across workflows (Redis-backed) #69 if landed).
  • tests/resilience/test_budget_local.py, test_budget_redis.py.

Regression tests

  • test_local_budget_tracker_existing_behavior_unchanged
  • test_local_budget_reserve_and_commit_track_correctly
  • test_local_budget_release_decrements_reserved
  • test_redis_budget_tracker_concurrent_reserves_atomic
  • test_redis_budget_two_pods_share_state (needs fakeredis)
  • test_pre_flight_reservation_prevents_overshoot (related to fix DAG skip propagation, parallel cancellation, and budget overshoot #108)
  • test_session_scope_enforced_in_addition_to_workflow_scope
  • test_budget_status_cli_shows_breakdown

Notes

  • Sibling of add shared rate limiting across workflows (Redis-backed) #69 (distributed rate limiting). Same Redis dependency, similar Lua-script pattern, similar abstraction shape. Coordinate the two PRs to share infrastructure (Redis client setup, error handling).
  • This is the prerequisite for AgentTest's session-level budget enforcement mentioned in agenttest-planteamiento.md: "AgentTest configura un budget por sesión de testing y delega su enforcement a AgentLoom."
  • Pre-flight reservation also addresses the post-hoc overshoot bug tracked in fix DAG skip propagation, parallel cancellation, and budget overshoot #108 — the right fix for that bug is to reserve before dispatch, not just to lock the local counter.
  • Local tracker remains the default. Distributed is opt-in; no new core dependencies.
  • Pairs with pre-flight budget estimation before LLM calls #12 (pre-flight estimation) — that issue produces the estimate that this issue's reserve() consumes.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or requestresilienceCircuit breaker, retry, rate limiter

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions