Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions pulse/packages/pulse-data/alembic/versions/012_pipeline_progress.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
"""FDD-OPS-015 — pipeline_progress table for per-scope ingestion observability.

Per-scope progress tracking, separate from `pipeline_ingestion_progress`
(which is per-`entity_type` aggregate, 4 rows total). This table holds
~32+ rows during a backfill cycle (one per active scope: per Jira project,
per GitHub repo, per Jenkins job).

Schema choices:
- (tenant_id, entity_type, scope_key) is the natural primary index for
upsert-on-progress-tick. Using a UNIQUE constraint instead of PK to keep
`id UUID` for cross-table joins (consistent with other pipeline tables).
- `started_at` + `last_progress_at` allow the API to compute "stalled":
`last_progress_at < now() - interval '60 seconds'` while `status='running'`
- `status` enum (not actual SQL enum to avoid alter-type pain):
running | done | failed | paused | cancelled
- `last_error` is text — full traceback or short message, decided by emitter
- `items_per_second` is the rolling rate (worker computes via window)
- `eta_seconds` is computed: max(0, (estimate - done) / rate) when rate > 0

Retention: live tracking + historical (no auto-truncate). Recommended
external cron: `DELETE WHERE status IN ('done','failed') AND last_progress_at
< now() - interval '7 days'` to bound table size.

Revision ID: 012_pipeline_progress
Revises: 011_drop_legacy_watermark
Create Date: 2026-04-29
"""

from typing import Sequence, Union

import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql


revision: str = "012_pipeline_progress"
down_revision: Union[str, None] = "011_drop_legacy_watermark"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
"""Create pipeline_progress table with per-scope tracking."""
op.create_table(
"pipeline_progress",
sa.Column(
"id",
postgresql.UUID(as_uuid=True),
primary_key=True,
server_default=sa.text("gen_random_uuid()"),
),
sa.Column("tenant_id", postgresql.UUID(as_uuid=True), nullable=False),
# FDD-OPS-014 — scope_key follows the same convention used in
# pipeline_watermarks: '<source>:<dimension>:<value>' (e.g.,
# 'jira:project:BG', 'github:repo:foo/bar', 'jenkins:job:deploy-X').
sa.Column("scope_key", sa.String(255), nullable=False),
sa.Column("entity_type", sa.String(64), nullable=False),
# Phase within the per-scope job lifecycle. The connector and worker
# together transition: pre_flight → fetching → normalizing →
# persisting → done (or → failed). 'pre_flight' is the count call;
# 'persisting' covers both upsert and Kafka emit.
sa.Column("phase", sa.String(32), nullable=False, server_default="pre_flight"),
sa.Column("status", sa.String(16), nullable=False, server_default="running"),
sa.Column("items_done", sa.Integer, nullable=False, server_default="0"),
# NULLABLE: estimate may not be available (count call too expensive,
# or source doesn't expose it). Worker falls back to heuristic
# 'items_done × historical_rate'.
sa.Column("items_estimate", sa.Integer, nullable=True),
sa.Column("items_per_second", sa.Float, nullable=False, server_default="0.0"),
# ETA in seconds remaining. -1 sentinel = unknown (no estimate yet).
sa.Column("eta_seconds", sa.Integer, nullable=True),
sa.Column(
"started_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.text("CURRENT_TIMESTAMP"),
),
sa.Column(
"last_progress_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.text("CURRENT_TIMESTAMP"),
),
sa.Column("finished_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("last_error", sa.Text, nullable=True),
# TenantModel base columns (mirror other pipeline_* tables).
sa.Column(
"created_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.text("CURRENT_TIMESTAMP"),
),
sa.Column(
"updated_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.text("CURRENT_TIMESTAMP"),
),
sa.UniqueConstraint(
"tenant_id",
"entity_type",
"scope_key",
name="uq_pipeline_progress_scope",
),
)
# Composite index for the "show me running jobs" query.
op.create_index(
"ix_pipeline_progress_tenant_status",
"pipeline_progress",
["tenant_id", "status"],
)
# Index for "stalled" detection — partial index on running jobs.
op.execute(
"CREATE INDEX ix_pipeline_progress_running_last_progress "
"ON pipeline_progress (last_progress_at) "
"WHERE status = 'running'"
)


def downgrade() -> None:
op.execute("DROP INDEX IF EXISTS ix_pipeline_progress_running_last_progress")
op.drop_index("ix_pipeline_progress_tenant_status", table_name="pipeline_progress")
op.drop_table("pipeline_progress")
84 changes: 84 additions & 0 deletions pulse/packages/pulse-data/src/connectors/github_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,90 @@ async def get_source_count(self) -> int:
repos = await self._get_repos()
return len(repos)

async def count_prs_for_repo(
self,
repo_full_name: str,
since: datetime | None = None,
timeout_seconds: float = 10.0,
) -> int | None:
"""FDD-OPS-015 — pre-flight estimate of PRs in a repo.

Uses GitHub GraphQL `repository.pullRequests.totalCount` which is
cheap (single API call, no pagination). totalCount counts ALL states
(OPEN/MERGED/CLOSED) regardless of `since` — `since` filtering would
require the search API which costs more rate-limit budget.

Behavior on `since`:
- When `since` is provided, totalCount is an OVER-ESTIMATE
(returns lifetime PR count for the repo, not just since `since`)
- Worker should be aware: ETA at start may overshoot until rate
measurements stabilize after a few batches
- For full backfill (since=None), it's exact

Returns:
Total PRs in repo, or None on failure/timeout.
"""
owner, name = repo_full_name.split("/", 1) if "/" in repo_full_name else (self._org, repo_full_name)

query = """
query($owner: String!, $name: String!) {
repository(owner: $owner, name: $name) {
pullRequests(states: [OPEN, MERGED, CLOSED]) {
totalCount
}
}
}
"""

try:
import asyncio
response = await asyncio.wait_for(
self._client.post(
"/graphql",
json_body={
"query": query,
"variables": {"owner": owner, "name": name},
},
),
timeout=timeout_seconds,
)
except asyncio.TimeoutError:
logger.warning(
"[count] %s: GraphQL totalCount exceeded %.1fs — None",
repo_full_name, timeout_seconds,
)
return None
except Exception:
logger.exception(
"[count] %s: GraphQL totalCount failed — None", repo_full_name,
)
return None

if "errors" in response and response.get("data") is None:
logger.warning(
"[count] %s: GraphQL errors %s", repo_full_name,
response.get("errors"),
)
return None

try:
count = response["data"]["repository"]["pullRequests"]["totalCount"]
except (KeyError, TypeError):
logger.warning(
"[count] %s: unexpected GraphQL shape — None", repo_full_name,
)
return None

if not isinstance(count, int):
return None

logger.info(
"[count] %s: %d total PRs (since filter not applied — "
"may over-estimate for incremental)",
repo_full_name, count,
)
return count

async def fetch_pull_requests_batched(
self,
since: datetime | None = None,
Expand Down
59 changes: 59 additions & 0 deletions pulse/packages/pulse-data/src/connectors/jenkins_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,65 @@ async def close(self) -> None:
# Internal: Fetch and map builds
# ------------------------------------------------------------------

async def count_builds_for_job(
self,
job_name: str,
since: datetime | None = None,
timeout_seconds: float = 10.0,
) -> int | None:
"""FDD-OPS-015 — pre-flight estimate of builds in a Jenkins job.

Uses the same `BUILD_TREE` query but extracts only the count. The
Jenkins tree spec `builds[number,timestamp]{0,100}` returns at most
100 most recent builds — for jobs that build infrequently this
captures everything; for high-frequency jobs we return 100 as
floor (worker treats this as a lower-bound estimate).

When `since` is provided, filters builds whose timestamp >= since.
Cheaper than fetching full build details (no result/duration).

Returns:
Build count (possibly capped at 100), or None on failure/timeout.
"""
api_path = f"/job/{job_name.replace('/', '/job/')}/api/json"
# Lighter tree than BUILD_TREE — only number + timestamp for filtering.
params = {"tree": "builds[number,timestamp]{0,100}"}

try:
import asyncio
data = await asyncio.wait_for(
self._client.get(api_path, params=params),
timeout=timeout_seconds,
)
except asyncio.TimeoutError:
logger.warning(
"[count] %s: Jenkins builds list exceeded %.1fs — None",
job_name, timeout_seconds,
)
return None
except Exception:
logger.exception(
"[count] %s: Jenkins builds list failed — None", job_name,
)
return None

builds = data.get("builds") or []
if not since:
logger.info(
"[count] %s: %d builds (capped at 100 most recent)",
job_name, len(builds),
)
return len(builds)

# Filter by timestamp client-side (Jenkins doesn't filter natively).
since_ms = int(since.timestamp() * 1000)
filtered = [b for b in builds if (b.get("timestamp") or 0) >= since_ms]
logger.info(
"[count] %s: %d builds since %s (of %d returned)",
job_name, len(filtered), since.isoformat(), len(builds),
)
return len(filtered)

async def _fetch_job_builds(
self, job_name: str, since: datetime | None = None,
) -> list[dict[str, Any]]:
Expand Down
84 changes: 84 additions & 0 deletions pulse/packages/pulse-data/src/connectors/jira_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,90 @@ async def fetch_issues(
logger.info("Fetched %d issues from Jira (%d projects, %d pages)", len(all_issues), len(effective_projects), page)
return all_issues

async def count_issues_for_project(
self,
project_key: str,
since: datetime | None = None,
timeout_seconds: float = 10.0,
) -> int | None:
"""FDD-OPS-015 — pre-flight estimate of issues to fetch in a project.

Uses POST /rest/api/3/search/approximate-count (Jira Cloud's dedicated
cheap-count endpoint) with the same JQL that fetch_issues_batched
will use. Returns the total count without paginating through items.

Args:
project_key: Jira project key (e.g., "BG").
since: Watermark for incremental count. None = full project size.
timeout_seconds: If the count call exceeds this budget, return None
so the worker falls back to a heuristic estimate. Default 10s
follows the FDD recommendation; tunable per call.

Returns:
Estimated issue count, or None if the count call failed/timed out.
None is a defensive signal — worker MUST handle gracefully (use
historical rate × elapsed as fallback).

Notes:
- Jira's `total` field on /search/jql is being phased out by
Atlassian; approximate-count is the supported successor.
- Approximate counts may be off by ±5% during heavy index churn.
Acceptable for ETA — we only need order-of-magnitude.
- Rate limit: counts towards Jira API quota (1 call per scope
per cycle). For 32 active projects, ~32 extra calls/cycle —
negligible vs the per-issue fetch cost.
"""
# Same JQL shape as fetch_issues_batched for consistency.
jql = f'project = "{project_key}"'
if since:
since_str = since.strftime("%Y-%m-%d %H:%M")
jql += f' AND updated >= "{since_str}"'

body = {"jql": jql}

try:
# Wrap in a timeout — if Jira's index is slow, don't block ingestion.
import asyncio
data = await asyncio.wait_for(
self._client.post(
f"{REST_API}/search/approximate-count",
json_body=body,
),
timeout=timeout_seconds,
)
except asyncio.TimeoutError:
logger.warning(
"[count] %s: approximate-count exceeded %.1fs timeout — "
"worker will use heuristic estimate",
project_key, timeout_seconds,
)
return None
except Exception:
logger.exception(
"[count] %s: approximate-count failed — falling back to None",
project_key,
)
return None

# Jira returns { "count": N }. Defensive against shape drift.
count = data.get("count")
if count is None:
# Older Jira responses may use "total" — accept either.
count = data.get("total")
if count is None or not isinstance(count, int):
logger.warning(
"[count] %s: unexpected response shape %r — None",
project_key, list(data.keys())[:5],
)
return None

logger.info(
"[count] %s: estimated %d issues (since=%s)",
project_key, count,
since.isoformat() if since else "full-history",
)
return count

async def fetch_issues_batched(
self,
project_keys: list[str],
Expand Down
Loading
Loading