Skip to content

Add AI-step concurrency backpressure across flows #1820

@chubes4

Description

@chubes4

Problem

The WordPress.com wiki firehose burn-in showed that Data Machine pipeline correctness is now much stronger, but concurrent AI calls can still overwhelm the provider transport layer.

Burn-in on intelligence-chubes4:

  • Flow 2: WordPress.com MGS history queue
  • Flow 10: WordPress.com GitHub platform history queue
  • Flow 11: WordPress.com scoped Slack history queue

All three were capped to max_items=1 and started together. The scheduler/drain path worked:

12 step executions, 12 completions, 0 Action Scheduler failures

But all three AI steps exhausted retries on OpenAI connect timeout:

cURL error 28: Connection timed out after 15000 milliseconds

Retries were correctly classified after #1819:

{
  "retry_class": "transport_connect_timeout",
  "delay_seconds": 15
}

Then second retry used 30s and eventually exhausted. This shows the timeout/retry machinery is now behaving, but the AI transport layer needs backpressure.

Why This Matters

The point of the WordPress.com brain is to digest a source firehose into durable wiki memory:

source firehose
  -> scoped fetch lanes
  -> fresh-candidate selection
  -> AI triage
  -> reject_source / defer_item / wiki_upsert
  -> canonical wiki pages + provenance + freshness
  -> graph extraction + maintenance

Fetch lanes should be able to scale broadly, but AI digestion must be pressure-regulated. Without AI concurrency control, multiple flows can stampede OpenAI and convert a healthy backlog into repeated transport failures.

Desired Behavior

Add a Data Machine-level concurrency/backpressure layer for AI step execution:

  • Limit concurrent AI provider requests across flows.
  • Default should be conservative, likely 1 or 2 concurrent AI requests per site/provider.
  • Fetch/upsert/non-AI steps should continue normally where safe.
  • AI actions above the concurrency limit should be delayed/requeued, not failed.
  • The limiter should be generic, not Intelligence/WordPress.com/MGS-specific.
  • It should work with Action Scheduler and concurrent drain runs.

Initial Shape

A minimal implementation could be:

before AI request dispatch:
  acquire AI execution lease/provider slot
  if unavailable:
    reschedule action shortly and exit cleanly
  else:
    execute AI step
    release lease in finally

Potential dimensions:

  • global site AI concurrency
  • provider-level concurrency (openai, anthropic, etc.)
  • mode-level concurrency (pipeline, maybe chat exempt or separate)

Start simple if needed: site-wide pipeline AI concurrency is enough for this burn-in.

Settings / Observability

Expose settings or filters for:

  • pipeline_ai_concurrency or similar
  • optional provider-specific limit
  • throttle retry/delay interval, e.g. 5-15s

Log when throttling happens:

{
  "job_id": 661,
  "flow_step_id": "...",
  "provider": "openai",
  "reason": "ai_concurrency_limit",
  "limit": 1,
  "active": 1,
  "rescheduled_for_seconds": 10
}

Expose enough metadata to distinguish:

  • provider transport failure
  • rate limit/backoff
  • AI concurrency throttle
  • actual model/tool failure

Acceptance Criteria

  • Add AI-step concurrency limiter/backpressure for pipeline AI execution.
  • Above-limit AI work is rescheduled/deferred cleanly, not failed or marked processed.
  • Lease/lock is released on success, failure, exception, retry scheduling, and tool-defer paths.
  • Tests prove:
    • first AI step acquires slot
    • second concurrent AI step is throttled/rescheduled
    • release happens on failure/exception
    • non-AI steps are not blocked by AI concurrency
    • existing retry policy for cURL 28 still works when a slot is acquired
  • Logs include throttle metadata.
  • No source-specific logic for Intelligence, WordPress.com, MGS, Slack, GitHub, or Flow IDs.
  • No CHANGELOG.md edits or manual version bumps.

Runtime Evidence

Jobs from the failed 3-flow burn-in:

  • Flow 10 job 659: failed after 3 OpenAI connect timeouts.
  • Flow 11 job 660: failed after 3 OpenAI connect timeouts.
  • Flow 2 job 661: failed after 3 OpenAI connect timeouts.

All had fast timeout/retry behavior working; all failed because concurrent AI calls saturated the transport path.

Goal

Make Data Machine a pressure-regulated refinery: many fetch lanes can fill the candidate backlog, while AI digestion proceeds at a safe concurrency level and preserves candidate state through reject/defer/write outcomes.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions