diff --git a/src/primer/server/services/maturity_context_quality.py b/src/primer/server/services/maturity_context_quality.py new file mode 100644 index 0000000..a48092b --- /dev/null +++ b/src/primer/server/services/maturity_context_quality.py @@ -0,0 +1,281 @@ +"""Project readiness and context quality helpers for maturity analytics.""" + +from datetime import datetime +from typing import Any + +from sqlalchemy.orm import Session + +from primer.common.models import GitRepository, SessionFacets +from primer.common.models import Session as SessionModel +from primer.common.schemas import ContextQualityEntry, ProjectReadinessEntry +from primer.server.services.maturity_harness_fingerprints import context_signal_count + + +def build_project_readiness_and_context_quality( + *, + db: Session, + session_id_subq: Any, + model_rows: list[Any], + per_session: dict[str, dict[str, int]], + sessions_analyzed: int, +) -> tuple[list[ProjectReadinessEntry], list[ContextQualityEntry]]: + """Build repository readiness and context quality rows for scoped sessions.""" + project_readiness: list[ProjectReadinessEntry] = [] + context_quality: list[ContextQualityEntry] = [] + if sessions_analyzed <= 0: + return project_readiness, context_quality + + repo_context_buckets = _build_repo_context_buckets( + db=db, + session_id_subq=session_id_subq, + model_rows=model_rows, + per_session=per_session, + ) + if not repo_context_buckets: + return project_readiness, context_quality + + repos = ( + db.query(GitRepository) + .filter( + GitRepository.id.in_(list(repo_context_buckets.keys())), + ) + .all() + ) + repos_by_id = {repo.id: repo for repo in repos} + for repo in repos: + if repo.ai_readiness_score is not None: + project_readiness.append( + ProjectReadinessEntry( + repository=repo.full_name, + has_claude_md=repo.has_claude_md or False, + has_agents_md=repo.has_agents_md or False, + has_claude_dir=repo.has_claude_dir or False, + ai_readiness_score=repo.ai_readiness_score or 0.0, + session_count=len(repo_context_buckets[repo.id]["sessions"]), + ) + ) + + for repository_id, bucket in repo_context_buckets.items(): + repo = repos_by_id.get(repository_id) + if repo is None: + continue + context_quality.append(_build_context_quality_entry(repo, bucket)) + + project_readiness.sort(key=lambda p: p.ai_readiness_score, reverse=True) + context_quality.sort(key=lambda row: (-row.context_quality_score, -row.session_count)) + return project_readiness, context_quality + + +def _build_repo_context_buckets( + *, + db: Session, + session_id_subq: Any, + model_rows: list[Any], + per_session: dict[str, dict[str, int]], +) -> dict[str, dict[str, Any]]: + repo_session_rows = ( + db.query( + SessionModel.id, + SessionModel.repository_id, + SessionModel.input_tokens, + SessionModel.cache_read_tokens, + SessionModel.source_metadata, + ) + .filter( + SessionModel.id.in_(db.query(session_id_subq.c.id)), + SessionModel.repository_id.isnot(None), + ) + .all() + ) + facet_session_ids = { + row.session_id + for row in ( + db.query(SessionFacets.session_id) + .filter(SessionFacets.session_id.in_(db.query(session_id_subq.c.id))) + .all() + ) + } + model_session_ids = {session_id for session_id, *_rest in model_rows} + repo_context_buckets: dict[str, dict[str, Any]] = {} + for ( + session_id, + repository_id, + input_tokens, + cache_read_tokens, + source_metadata, + ) in repo_session_rows: + if repository_id is None: + continue + bucket = repo_context_buckets.setdefault( + repository_id, + { + "sessions": set(), + "input_tokens": 0, + "cache_read_tokens": 0, + "tool_sessions": set(), + "model_sessions": set(), + "facet_sessions": set(), + "context_usage_sessions": set(), + }, + ) + bucket["sessions"].add(session_id) + bucket["input_tokens"] += input_tokens or 0 + bucket["cache_read_tokens"] += cache_read_tokens or 0 + if per_session.get(session_id): + bucket["tool_sessions"].add(session_id) + if session_id in model_session_ids: + bucket["model_sessions"].add(session_id) + if session_id in facet_session_ids: + bucket["facet_sessions"].add(session_id) + if context_signal_count(source_metadata) > 0: + bucket["context_usage_sessions"].add(session_id) + return repo_context_buckets + + +def _build_context_quality_entry( + repo: GitRepository, + bucket: dict[str, Any], +) -> ContextQualityEntry: + session_count = len(bucket["sessions"]) + input_tokens = bucket["input_tokens"] + cache_read_tokens = bucket["cache_read_tokens"] + token_denominator = input_tokens + cache_read_tokens + cache_hit_rate = ( + round(cache_read_tokens / token_denominator, 3) if token_denominator > 0 else None + ) + avg_input_tokens = ( + round(input_tokens / session_count, 1) + if session_count > 0 and token_denominator > 0 + else None + ) + + guide_coverage_score = _guide_coverage_score(repo) + guide_freshness_score = _guide_freshness_score(repo.ai_readiness_checked_at) + context_usage_coverage_pct = _coverage_pct( + len(bucket["context_usage_sessions"]), + session_count, + ) + tool_coverage_pct = _coverage_pct(len(bucket["tool_sessions"]), session_count) + model_coverage_pct = _coverage_pct(len(bucket["model_sessions"]), session_count) + facet_coverage_pct = _coverage_pct(len(bucket["facet_sessions"]), session_count) + sensor_coverage_score = round( + (context_usage_coverage_pct + tool_coverage_pct + model_coverage_pct + facet_coverage_pct) + / 4, + 1, + ) + cache_score = min((cache_hit_rate or 0.0) / 0.5, 1.0) + token_efficiency_score = round( + ((cache_score * 0.6) + (_prompt_efficiency_score(avg_input_tokens) * 0.4)) * 100, + 1, + ) + context_quality_score = round( + (guide_coverage_score * 0.30) + + (guide_freshness_score * 0.15) + + (token_efficiency_score * 0.25) + + (sensor_coverage_score * 0.30), + 1, + ) + + return ContextQualityEntry( + repository=repo.full_name, + session_count=session_count, + context_quality_score=context_quality_score, + guide_coverage_score=round(guide_coverage_score, 1), + guide_freshness_score=guide_freshness_score, + token_efficiency_score=token_efficiency_score, + sensor_coverage_score=sensor_coverage_score, + cache_hit_rate=cache_hit_rate, + avg_input_tokens=avg_input_tokens, + context_usage_coverage_pct=context_usage_coverage_pct, + tool_coverage_pct=tool_coverage_pct, + model_coverage_pct=model_coverage_pct, + facet_coverage_pct=facet_coverage_pct, + has_claude_md=repo.has_claude_md or False, + has_agents_md=repo.has_agents_md or False, + readiness_checked_at=repo.ai_readiness_checked_at, + top_gaps=_context_quality_gaps( + repo=repo, + guide_freshness_score=guide_freshness_score, + cache_hit_rate=cache_hit_rate, + avg_input_tokens=avg_input_tokens, + context_usage_coverage_pct=context_usage_coverage_pct, + tool_coverage_pct=tool_coverage_pct, + model_coverage_pct=model_coverage_pct, + facet_coverage_pct=facet_coverage_pct, + )[:4], + ) + + +def _guide_coverage_score(repo: GitRepository) -> float: + if repo.ai_readiness_score is not None: + return repo.ai_readiness_score + return ( + (50.0 if repo.has_claude_md else 0.0) + + (20.0 if repo.has_agents_md else 0.0) + + (30.0 if repo.has_claude_dir else 0.0) + ) + + +def _coverage_pct(count: int, total: int) -> float: + return round((count / total) * 100, 1) if total > 0 else 0.0 + + +def _guide_freshness_score(checked_at: datetime | None) -> float: + if checked_at is None: + return 0.0 + now = datetime.now(tz=checked_at.tzinfo) if checked_at.tzinfo else datetime.now() + age_days = max((now - checked_at).days, 0) + if age_days <= 14: + return 100.0 + if age_days <= 30: + return 80.0 + if age_days <= 90: + return 50.0 + return 25.0 + + +def _prompt_efficiency_score(avg_input_tokens: float | None) -> float: + if avg_input_tokens is None: + return 0.0 + if avg_input_tokens <= 20_000: + return 1.0 + if avg_input_tokens <= 50_000: + return 0.8 + if avg_input_tokens <= 100_000: + return 0.5 + return 0.25 + + +def _context_quality_gaps( + *, + repo: GitRepository, + guide_freshness_score: float, + cache_hit_rate: float | None, + avg_input_tokens: float | None, + context_usage_coverage_pct: float, + tool_coverage_pct: float, + model_coverage_pct: float, + facet_coverage_pct: float, +) -> list[str]: + gaps: list[str] = [] + if not repo.has_claude_md: + gaps.append("Add CLAUDE.md") + if not repo.has_agents_md: + gaps.append("Add AGENTS.md") + if guide_freshness_score < 75: + gaps.append("Refresh guidance scan") + if cache_hit_rate is None: + gaps.append("Add token/cache telemetry") + elif cache_hit_rate < 0.25: + gaps.append("Improve cache reuse") + if avg_input_tokens is not None and avg_input_tokens > 50_000: + gaps.append("Trim prompt/context payloads") + if context_usage_coverage_pct < 50: + gaps.append("Increase context telemetry coverage") + if tool_coverage_pct < 90: + gaps.append("Complete tool telemetry") + if model_coverage_pct < 90: + gaps.append("Complete model telemetry") + if facet_coverage_pct < 90: + gaps.append("Complete outcome facets") + return gaps diff --git a/src/primer/server/services/maturity_harness_fingerprints.py b/src/primer/server/services/maturity_harness_fingerprints.py new file mode 100644 index 0000000..fc8f23e --- /dev/null +++ b/src/primer/server/services/maturity_harness_fingerprints.py @@ -0,0 +1,177 @@ +"""Harness configuration fingerprint helpers for maturity analytics.""" + +from collections import Counter +from hashlib import sha256 +from typing import Any + +from primer.common.facet_taxonomy import is_success_outcome +from primer.common.schemas import EngineerLeverageProfile, HarnessConfigurationFingerprint + +DEFAULT_COMPOUND_RELIABILITY_CHAIN_LENGTH = 10 + + +def context_signal_count(source_metadata: object) -> int: + """Return the number of native context references captured for a session.""" + if not isinstance(source_metadata, dict): + return 0 + native_telemetry = source_metadata.get("native_telemetry") + if not isinstance(native_telemetry, dict): + return 0 + context_usage = native_telemetry.get("context_usage") + if isinstance(context_usage, dict): + reference_count = context_usage.get("reference_count") + if isinstance(reference_count, int | float): + return max(int(reference_count), 0) + return 0 + + +def build_harness_configuration_fingerprints( + *, + source_metadata_rows: list[Any], + per_session: dict[str, dict[str, int]], + session_customization_fingerprint_parts: dict[str, set[str]], + session_customization_labels: dict[str, set[str]], + session_metrics: dict[str, dict[str, Any]], + session_engineer: dict[str, tuple[str, str]], + profile_by_engineer_id: dict[str, EngineerLeverageProfile], + compound_reliability_chain_length: int = DEFAULT_COMPOUND_RELIABILITY_CHAIN_LENGTH, +) -> list[HarnessConfigurationFingerprint]: + """Build compact fingerprints for repeated harness configurations.""" + session_source_metadata = {row.id: row.source_metadata or {} for row in source_metadata_rows} + harness_fingerprint_data: dict[str, dict[str, Any]] = {} + + for row in source_metadata_rows: + tools = per_session.get(row.id, {}) + if not tools and row.id not in session_customization_fingerprint_parts: + continue + tool_names = tuple(sorted(tool_name for tool_name, count in tools.items() if count > 0)) + customization_parts = tuple( + sorted(session_customization_fingerprint_parts.get(row.id, set())) + ) + customization_names = tuple(sorted(session_customization_labels.get(row.id, set()))) + permission_mode = row.permission_mode or "default" + context_count = context_signal_count(session_source_metadata.get(row.id, {})) + context_bucket = "context:present" if context_count > 0 else "context:none" + raw_parts = [ + f"agent:{row.agent_type}", + f"permission:{permission_mode}", + *[f"tool:{tool}" for tool in tool_names], + *[f"customization:{customization}" for customization in customization_parts], + context_bucket, + ] + fingerprint_id = sha256("|".join(raw_parts).encode()).hexdigest()[:16] + bucket = harness_fingerprint_data.setdefault( + fingerprint_id, + { + "fingerprint_id": fingerprint_id, + "agent_type": row.agent_type, + "permission_mode": permission_mode, + "sessions": set(), + "engineers": set(), + "success_sessions": set(), + "leverage_scores": [], + "tool_counts": [], + "customization_counts": [], + "context_signal_counts": [], + "tools": Counter(), + "customizations": Counter(), + "signals": Counter(), + }, + ) + bucket["sessions"].add(row.id) + session_metric = session_metrics.get(row.id) + if session_metric is not None: + engineer_id_for_session = session_metric["engineer_id"] + if engineer_id_for_session: + bucket["engineers"].add(engineer_id_for_session) + outcome = session_metric["outcome"] + if outcome and is_success_outcome(outcome): + bucket["success_sessions"].add(row.id) + bucket["tool_counts"].append(len(tool_names)) + bucket["customization_counts"].append(len(customization_names)) + bucket["context_signal_counts"].append(context_count) + bucket["tools"].update(tool_names) + bucket["customizations"].update(customization_names) + bucket["signals"].update( + [ + f"agent:{row.agent_type}", + f"permission:{permission_mode}", + "context" if context_count > 0 else "no_context", + ] + ) + if row.id in session_engineer: + engineer_id_for_session = session_engineer[row.id][0] + profile = profile_by_engineer_id.get(engineer_id_for_session) + if profile is not None: + bucket["leverage_scores"].append(profile.leverage_score) + + return [ + HarnessConfigurationFingerprint( + fingerprint_id=bucket["fingerprint_id"], + label=( + f"{bucket['agent_type']} / {bucket['permission_mode']} / " + f"{round(sum(bucket['tool_counts']) / len(bucket['tool_counts']), 1)} tools" + ), + agent_type=bucket["agent_type"], + permission_mode=bucket["permission_mode"], + session_count=len(bucket["sessions"]), + engineer_count=len(bucket["engineers"]), + success_rate=( + round(len(bucket["success_sessions"]) / len(bucket["sessions"]), 3) + if bucket["sessions"] + else None + ), + avg_leverage_score=( + round(sum(bucket["leverage_scores"]) / len(bucket["leverage_scores"]), 1) + if bucket["leverage_scores"] + else 0.0 + ), + compound_reliability_rate=_fingerprint_compound_rate( + bucket, + compound_reliability_chain_length, + ), + tool_count=round(sum(bucket["tool_counts"]) / len(bucket["tool_counts"])) + if bucket["tool_counts"] + else 0, + customization_count=round( + sum(bucket["customization_counts"]) / len(bucket["customization_counts"]) + ) + if bucket["customization_counts"] + else 0, + context_signal_count=round( + sum(bucket["context_signal_counts"]) / len(bucket["context_signal_counts"]) + ) + if bucket["context_signal_counts"] + else 0, + top_tools=[tool for tool, _count in bucket["tools"].most_common(5)], + top_customizations=[ + customization for customization, _count in bucket["customizations"].most_common(5) + ], + signals=[signal for signal, _count in bucket["signals"].most_common(5)], + ) + for bucket in sorted( + harness_fingerprint_data.values(), + key=lambda item: ( + -len(item["sessions"]), + -( + sum(item["leverage_scores"]) / len(item["leverage_scores"]) + if item["leverage_scores"] + else 0.0 + ), + item["fingerprint_id"], + ), + ) + ][:25] + + +def _fingerprint_compound_rate(bucket: dict[str, Any], chain_length: int) -> float | None: + if not bucket["sessions"]: + return None + avg_steps = ( + sum(bucket["tool_counts"]) / len(bucket["tool_counts"]) if bucket["tool_counts"] else 0 + ) + if avg_steps <= 0: + return None + success_rate = len(bucket["success_sessions"]) / len(bucket["sessions"]) + per_step_rate = success_rate ** (1.0 / avg_steps) + return round(per_step_rate**chain_length, 3) diff --git a/src/primer/server/services/maturity_reliability.py b/src/primer/server/services/maturity_reliability.py new file mode 100644 index 0000000..bcfd6c5 --- /dev/null +++ b/src/primer/server/services/maturity_reliability.py @@ -0,0 +1,151 @@ +"""Toolchain reliability helpers for maturity analytics.""" + +from collections import Counter +from typing import Any + +from primer.common.facet_taxonomy import is_success_outcome +from primer.common.schemas import ToolchainReliabilityEntry + +DEFAULT_COMPOUND_RELIABILITY_CHAIN_LENGTH = 10 + + +def ensure_reliability_bucket( + reliability_data: dict[tuple[str, str, str | None, str | None], dict[str, Any]], + *, + surface_type: str, + identifier: str, + provenance: str | None, + source_classification: str | None, +) -> dict[str, Any]: + """Return the aggregate bucket for one tool/customization surface.""" + return reliability_data.setdefault( + (surface_type, identifier, provenance, source_classification), + { + "identifier": identifier, + "surface_type": surface_type, + "provenance": provenance, + "source_classification": source_classification, + "sessions": set(), + "engineers": set(), + "friction_sessions": set(), + "failure_sessions": set(), + "recovered_sessions": set(), + "abandoned_sessions": set(), + "recovery_steps": [], + "success_sessions": set(), + "friction_type_counts": Counter(), + "total_call_count": 0, + }, + ) + + +def apply_session_reliability_metrics( + bucket: dict[str, Any], + session_id: str, + session_metric: dict[str, object] | None, +) -> None: + """Fold per-session friction, recovery, and outcome signals into a bucket.""" + if session_metric is None: + return + if session_metric["has_friction"]: + bucket["friction_sessions"].add(session_id) + bucket["friction_type_counts"].update(session_metric["friction_counts"]) + if session_metric["has_failure"]: + bucket["failure_sessions"].add(session_id) + outcome = session_metric["outcome"] + if outcome and is_success_outcome(outcome): + bucket["success_sessions"].add(session_id) + if session_metric["recovery_result"] == "recovered": + bucket["recovered_sessions"].add(session_id) + if session_metric["recovery_result"] == "abandoned": + bucket["abandoned_sessions"].add(session_id) + recovery_steps = session_metric["recovery_step_count"] + if isinstance(recovery_steps, int) and recovery_steps > 0: + bucket["recovery_steps"].append(recovery_steps) + + +def build_toolchain_reliability( + reliability_data: dict[tuple[str, str, str | None, str | None], dict[str, Any]], + *, + compound_reliability_chain_length: int = DEFAULT_COMPOUND_RELIABILITY_CHAIN_LENGTH, +) -> list[ToolchainReliabilityEntry]: + """Convert reliability buckets into sorted API response rows.""" + return [ + ToolchainReliabilityEntry( + identifier=bucket["identifier"], + surface_type=bucket["surface_type"], + provenance=bucket["provenance"], + source_classification=bucket["source_classification"], + session_count=len(bucket["sessions"]), + engineer_count=len(bucket["engineers"]), + total_call_count=bucket["total_call_count"], + avg_calls_per_session=( + round(bucket["total_call_count"] / len(bucket["sessions"]), 1) + if bucket["sessions"] + else 0.0 + ), + friction_session_count=len(bucket["friction_sessions"]), + friction_session_rate=( + round(len(bucket["friction_sessions"]) / len(bucket["sessions"]), 3) + if bucket["sessions"] + else None + ), + failure_session_count=len(bucket["failure_sessions"]), + failure_session_rate=( + round(len(bucket["failure_sessions"]) / len(bucket["sessions"]), 3) + if bucket["sessions"] + else None + ), + recovery_rate=( + round( + len(bucket["recovered_sessions"]) / len(bucket["friction_sessions"]), + 3, + ) + if bucket["friction_sessions"] + else None + ), + success_rate=( + round(len(bucket["success_sessions"]) / len(bucket["sessions"]), 3) + if bucket["sessions"] + else None + ), + compound_reliability_rate=_compound_reliability_rate( + bucket, + compound_reliability_chain_length, + ), + abandonment_rate=( + round(len(bucket["abandoned_sessions"]) / len(bucket["sessions"]), 3) + if bucket["sessions"] + else None + ), + avg_recovery_steps=( + round(sum(bucket["recovery_steps"]) / len(bucket["recovery_steps"]), 1) + if bucket["recovery_steps"] + else None + ), + top_friction_types=[ + friction_type + for friction_type, _count in bucket["friction_type_counts"].most_common(3) + ], + ) + for bucket in sorted( + reliability_data.values(), + key=lambda item: ( + -len(item["failure_sessions"]), + -len(item["friction_sessions"]), + -len(item["sessions"]), + item["identifier"], + ), + ) + ] + + +def _compound_reliability_rate(bucket: dict[str, Any], chain_length: int) -> float | None: + if not bucket["sessions"]: + return None + success_rate = len(bucket["success_sessions"]) / len(bucket["sessions"]) + avg_calls = bucket["total_call_count"] / len(bucket["sessions"]) + if avg_calls <= 0: + return None + per_call_rate = success_rate ** (1.0 / avg_calls) + return round(per_call_rate**chain_length, 3) diff --git a/src/primer/server/services/maturity_service.py b/src/primer/server/services/maturity_service.py index f19d9ee..105fa9a 100644 --- a/src/primer/server/services/maturity_service.py +++ b/src/primer/server/services/maturity_service.py @@ -2,7 +2,7 @@ from collections import Counter, defaultdict from datetime import datetime -from hashlib import md5, sha256 +from hashlib import md5 from statistics import median from sqlalchemy import func @@ -12,7 +12,6 @@ from primer.common.facet_taxonomy import canonical_outcome, is_success_outcome from primer.common.models import ( Engineer, - GitRepository, ModelUsage, PullRequest, SessionCommit, @@ -29,22 +28,18 @@ from primer.common.schemas import ( AgentSkillUsage, AgentTeamModeSummary, - ContextQualityEntry, CustomizationOutcomeAttribution, CustomizationStateFunnel, CustomizationUsage, DailyLeverageEntry, DelegationPatternSummary, EngineerLeverageProfile, - HarnessConfigurationFingerprint, HighPerformerStack, LeverageBreakdown, MaturityAnalyticsResponse, - ProjectReadinessEntry, StackCustomization, TeamCustomizationLandscape, ToolCategoryBreakdown, - ToolchainReliabilityEntry, ) from primer.common.tool_classification import ( CATEGORIES, @@ -57,6 +52,17 @@ from primer.server.services.analytics_service import base_session_query from primer.server.services.delegation_graph_service import extract_session_delegation_edges from primer.server.services.effectiveness_service import build_effectiveness_score +from primer.server.services.maturity_context_quality import ( + build_project_readiness_and_context_quality, +) +from primer.server.services.maturity_harness_fingerprints import ( + build_harness_configuration_fingerprints, +) +from primer.server.services.maturity_reliability import ( + apply_session_reliability_metrics, + build_toolchain_reliability, + ensure_reliability_bucket, +) _RELIABILITY_FAILURE_FRICTION_TYPES = frozenset({"tool_error", "exec_error", "timeout"}) _COMPOUND_RELIABILITY_CHAIN_LENGTH = 10 @@ -688,62 +694,19 @@ def get_maturity_analytics( # 4b. Toolchain reliability reliability_data: dict[tuple[str, str, str | None, str | None], dict] = {} - def _ensure_reliability_bucket( - surface_type: str, - identifier: str, - provenance: str | None, - source_classification: str | None, - ) -> dict: - return reliability_data.setdefault( - (surface_type, identifier, provenance, source_classification), - { - "identifier": identifier, - "surface_type": surface_type, - "provenance": provenance, - "source_classification": source_classification, - "sessions": set(), - "engineers": set(), - "friction_sessions": set(), - "failure_sessions": set(), - "recovered_sessions": set(), - "abandoned_sessions": set(), - "recovery_steps": [], - "success_sessions": set(), - "friction_type_counts": Counter(), - "total_call_count": 0, - }, - ) - - def _apply_session_reliability_metrics( - bucket: dict, - session_id: str, - session_metric: dict[str, object] | None, - ) -> None: - if session_metric is None: - return - if session_metric["has_friction"]: - bucket["friction_sessions"].add(session_id) - bucket["friction_type_counts"].update(session_metric["friction_counts"]) - if session_metric["has_failure"]: - bucket["failure_sessions"].add(session_id) - outcome = session_metric["outcome"] - if outcome and is_success_outcome(outcome): - bucket["success_sessions"].add(session_id) - if session_metric["recovery_result"] == "recovered": - bucket["recovered_sessions"].add(session_id) - if session_metric["recovery_result"] == "abandoned": - bucket["abandoned_sessions"].add(session_id) - recovery_steps = session_metric["recovery_step_count"] - if isinstance(recovery_steps, int) and recovery_steps > 0: - bucket["recovery_steps"].append(recovery_steps) - for sid, tools in per_session.items(): for tool_name, call_count in tools.items(): if call_count <= 0 or tool_name.startswith("Skill:") or tool_name.startswith("Task:"): continue if classify_tool(tool_name) == "mcp": continue - bucket = _ensure_reliability_bucket("built_in_tool", tool_name, "built_in", "built_in") + bucket = ensure_reliability_bucket( + reliability_data, + surface_type="built_in_tool", + identifier=tool_name, + provenance="built_in", + source_classification="built_in", + ) bucket["sessions"].add(sid) bucket["total_call_count"] += call_count if sid in session_metrics: @@ -751,7 +714,7 @@ def _apply_session_reliability_metrics( engineer_id_for_session = session_metric["engineer_id"] if engineer_id_for_session: bucket["engineers"].add(engineer_id_for_session) - _apply_session_reliability_metrics(bucket, sid, session_metric) + apply_session_reliability_metrics(bucket, sid, session_metric) # 5. Explicit customization breakdown customization_data: dict[tuple[str, str, str, str], dict] = {} @@ -853,31 +816,22 @@ def _apply_session_reliability_metrics( bucket["projects"][project_name] += 1 bucket["engineer_names"][engineer_names.get(engineer_id, "Unknown")] += 1 - reliability_bucket = _ensure_reliability_bucket( - customization_type, - identifier, - provenance, - source_classification, + reliability_bucket = ensure_reliability_bucket( + reliability_data, + surface_type=customization_type, + identifier=identifier, + provenance=provenance, + source_classification=source_classification, ) reliability_bucket["sessions"].add(session_id) reliability_bucket["engineers"].add(engineer_id) reliability_bucket["total_call_count"] += invocation_count or 0 - _apply_session_reliability_metrics( + apply_session_reliability_metrics( reliability_bucket, session_id, session_metrics.get(session_id), ) - def _compound_reliability_rate(bucket: dict) -> float | None: - if not bucket["sessions"]: - return None - success_rate = len(bucket["success_sessions"]) / len(bucket["sessions"]) - avg_calls = bucket["total_call_count"] / len(bucket["sessions"]) - if avg_calls <= 0: - return None - per_call_rate = success_rate ** (1.0 / avg_calls) - return round(per_call_rate**_COMPOUND_RELIABILITY_CHAIN_LENGTH, 3) - customization_breakdown = [ CustomizationUsage( identifier=bucket["identifier"], @@ -947,71 +901,10 @@ def _compound_reliability_rate(bucket: dict) -> float | None: ) ] - toolchain_reliability = [ - ToolchainReliabilityEntry( - identifier=bucket["identifier"], - surface_type=bucket["surface_type"], - provenance=bucket["provenance"], - source_classification=bucket["source_classification"], - session_count=len(bucket["sessions"]), - engineer_count=len(bucket["engineers"]), - total_call_count=bucket["total_call_count"], - avg_calls_per_session=( - round(bucket["total_call_count"] / len(bucket["sessions"]), 1) - if bucket["sessions"] - else 0.0 - ), - friction_session_count=len(bucket["friction_sessions"]), - friction_session_rate=( - round(len(bucket["friction_sessions"]) / len(bucket["sessions"]), 3) - if bucket["sessions"] - else None - ), - failure_session_count=len(bucket["failure_sessions"]), - failure_session_rate=( - round(len(bucket["failure_sessions"]) / len(bucket["sessions"]), 3) - if bucket["sessions"] - else None - ), - recovery_rate=( - round( - len(bucket["recovered_sessions"]) / len(bucket["friction_sessions"]), - 3, - ) - if bucket["friction_sessions"] - else None - ), - success_rate=( - round(len(bucket["success_sessions"]) / len(bucket["sessions"]), 3) - if bucket["sessions"] - else None - ), - compound_reliability_rate=_compound_reliability_rate(bucket), - abandonment_rate=( - round(len(bucket["abandoned_sessions"]) / len(bucket["sessions"]), 3) - if bucket["sessions"] - else None - ), - avg_recovery_steps=( - round(sum(bucket["recovery_steps"]) / len(bucket["recovery_steps"]), 1) - if bucket["recovery_steps"] - else None - ), - top_friction_types=[ - friction_type - for friction_type, _count in bucket["friction_type_counts"].most_common(3) - ], - ) - for bucket in sorted( - reliability_data.values(), - key=lambda item: ( - -len(item["failure_sessions"]), - -len(item["friction_sessions"]), - -len(item["sessions"]), - item["identifier"], - ), - ) - ] + toolchain_reliability = build_toolchain_reliability( + reliability_data, + compound_reliability_chain_length=_COMPOUND_RELIABILITY_CHAIN_LENGTH, + ) total_engineers = len(all_engineer_ids) profile_by_engineer_id = {profile.engineer_id: profile for profile in engineer_profiles} @@ -1026,153 +919,16 @@ def _compound_reliability_rate(bucket: dict) -> float | None: .filter(SessionModel.id.in_(db.query(session_id_subq.c.id))) .all() ) - session_source_metadata = {row.id: row.source_metadata or {} for row in source_metadata_rows} - - def _context_signal_count(source_metadata: object) -> int: - if not isinstance(source_metadata, dict): - return 0 - native_telemetry = source_metadata.get("native_telemetry") - if not isinstance(native_telemetry, dict): - return 0 - context_usage = native_telemetry.get("context_usage") - if isinstance(context_usage, dict): - reference_count = context_usage.get("reference_count") - if isinstance(reference_count, int | float): - return max(int(reference_count), 0) - return 0 - - harness_fingerprint_data: dict[str, dict] = {} - for row in source_metadata_rows: - tools = per_session.get(row.id, {}) - if not tools and row.id not in session_customization_fingerprint_parts: - continue - tool_names = tuple(sorted(tool_name for tool_name, count in tools.items() if count > 0)) - customization_parts = tuple( - sorted(session_customization_fingerprint_parts.get(row.id, set())) - ) - customization_names = tuple(sorted(session_customization_labels.get(row.id, set()))) - permission_mode = row.permission_mode or "default" - context_count = _context_signal_count(session_source_metadata.get(row.id, {})) - context_bucket = "context:present" if context_count > 0 else "context:none" - raw_parts = [ - f"agent:{row.agent_type}", - f"permission:{permission_mode}", - *[f"tool:{tool}" for tool in tool_names], - *[f"customization:{customization}" for customization in customization_parts], - context_bucket, - ] - fingerprint_id = sha256("|".join(raw_parts).encode()).hexdigest()[:16] - bucket = harness_fingerprint_data.setdefault( - fingerprint_id, - { - "fingerprint_id": fingerprint_id, - "agent_type": row.agent_type, - "permission_mode": permission_mode, - "sessions": set(), - "engineers": set(), - "success_sessions": set(), - "leverage_scores": [], - "tool_counts": [], - "customization_counts": [], - "context_signal_counts": [], - "tools": Counter(), - "customizations": Counter(), - "signals": Counter(), - }, - ) - bucket["sessions"].add(row.id) - session_metric = session_metrics.get(row.id) - if session_metric is not None: - engineer_id_for_session = session_metric["engineer_id"] - if engineer_id_for_session: - bucket["engineers"].add(engineer_id_for_session) - outcome = session_metric["outcome"] - if outcome and is_success_outcome(outcome): - bucket["success_sessions"].add(row.id) - bucket["tool_counts"].append(len(tool_names)) - bucket["customization_counts"].append(len(customization_names)) - bucket["context_signal_counts"].append(context_count) - bucket["tools"].update(tool_names) - bucket["customizations"].update(customization_names) - bucket["signals"].update( - [ - f"agent:{row.agent_type}", - f"permission:{permission_mode}", - "context" if context_count > 0 else "no_context", - ] - ) - if row.id in session_engineer: - engineer_id_for_session = session_engineer[row.id][0] - profile = profile_by_engineer_id.get(engineer_id_for_session) - if profile is not None: - bucket["leverage_scores"].append(profile.leverage_score) - - def _fingerprint_compound_rate(bucket: dict) -> float | None: - if not bucket["sessions"]: - return None - avg_steps = ( - sum(bucket["tool_counts"]) / len(bucket["tool_counts"]) if bucket["tool_counts"] else 0 - ) - if avg_steps <= 0: - return None - success_rate = len(bucket["success_sessions"]) / len(bucket["sessions"]) - per_step_rate = success_rate ** (1.0 / avg_steps) - return round(per_step_rate**_COMPOUND_RELIABILITY_CHAIN_LENGTH, 3) - - harness_configuration_fingerprints = [ - HarnessConfigurationFingerprint( - fingerprint_id=bucket["fingerprint_id"], - label=( - f"{bucket['agent_type']} / {bucket['permission_mode']} / " - f"{round(sum(bucket['tool_counts']) / len(bucket['tool_counts']), 1)} tools" - ), - agent_type=bucket["agent_type"], - permission_mode=bucket["permission_mode"], - session_count=len(bucket["sessions"]), - engineer_count=len(bucket["engineers"]), - success_rate=( - round(len(bucket["success_sessions"]) / len(bucket["sessions"]), 3) - if bucket["sessions"] - else None - ), - avg_leverage_score=( - round(sum(bucket["leverage_scores"]) / len(bucket["leverage_scores"]), 1) - if bucket["leverage_scores"] - else 0.0 - ), - compound_reliability_rate=_fingerprint_compound_rate(bucket), - tool_count=round(sum(bucket["tool_counts"]) / len(bucket["tool_counts"])) - if bucket["tool_counts"] - else 0, - customization_count=round( - sum(bucket["customization_counts"]) / len(bucket["customization_counts"]) - ) - if bucket["customization_counts"] - else 0, - context_signal_count=round( - sum(bucket["context_signal_counts"]) / len(bucket["context_signal_counts"]) - ) - if bucket["context_signal_counts"] - else 0, - top_tools=[tool for tool, _count in bucket["tools"].most_common(5)], - top_customizations=[ - customization for customization, _count in bucket["customizations"].most_common(5) - ], - signals=[signal for signal, _count in bucket["signals"].most_common(5)], - ) - for bucket in sorted( - harness_fingerprint_data.values(), - key=lambda item: ( - -len(item["sessions"]), - -( - sum(item["leverage_scores"]) / len(item["leverage_scores"]) - if item["leverage_scores"] - else 0.0 - ), - item["fingerprint_id"], - ), - ) - ][:25] + harness_configuration_fingerprints = build_harness_configuration_fingerprints( + source_metadata_rows=source_metadata_rows, + per_session=per_session, + session_customization_fingerprint_parts=session_customization_fingerprint_parts, + session_customization_labels=session_customization_labels, + session_metrics=session_metrics, + session_engineer=session_engineer, + profile_by_engineer_id=profile_by_engineer_id, + compound_reliability_chain_length=_COMPOUND_RELIABILITY_CHAIN_LENGTH, + ) def _avg(values: list[float | None]) -> float | None: present = [value for value in values if value is not None] @@ -1492,218 +1248,13 @@ def _team_customization_label( ) # 6. Project readiness and context quality - project_readiness: list[ProjectReadinessEntry] = [] - context_quality: list[ContextQualityEntry] = [] - if sessions_analyzed > 0: - repo_session_rows = ( - db.query( - SessionModel.id, - SessionModel.repository_id, - SessionModel.input_tokens, - SessionModel.cache_read_tokens, - SessionModel.source_metadata, - ) - .filter( - SessionModel.id.in_(db.query(session_id_subq.c.id)), - SessionModel.repository_id.isnot(None), - ) - .all() - ) - facet_session_ids = { - row.session_id - for row in ( - db.query(SessionFacets.session_id) - .filter(SessionFacets.session_id.in_(db.query(session_id_subq.c.id))) - .all() - ) - } - model_session_ids = {session_id for session_id, *_rest in model_rows} - repo_context_buckets: dict[str, dict] = {} - for ( - session_id, - repository_id, - input_tokens, - cache_read_tokens, - source_metadata, - ) in repo_session_rows: - if repository_id is None: - continue - bucket = repo_context_buckets.setdefault( - repository_id, - { - "sessions": set(), - "input_tokens": 0, - "cache_read_tokens": 0, - "tool_sessions": set(), - "model_sessions": set(), - "facet_sessions": set(), - "context_usage_sessions": set(), - }, - ) - bucket["sessions"].add(session_id) - bucket["input_tokens"] += input_tokens or 0 - bucket["cache_read_tokens"] += cache_read_tokens or 0 - if per_session.get(session_id): - bucket["tool_sessions"].add(session_id) - if session_id in model_session_ids: - bucket["model_sessions"].add(session_id) - if session_id in facet_session_ids: - bucket["facet_sessions"].add(session_id) - if _context_signal_count(source_metadata) > 0: - bucket["context_usage_sessions"].add(session_id) - - def _coverage_pct(count: int, total: int) -> float: - return round((count / total) * 100, 1) if total > 0 else 0.0 - - def _guide_freshness_score(checked_at: datetime | None) -> float: - if checked_at is None: - return 0.0 - now = datetime.now(tz=checked_at.tzinfo) if checked_at.tzinfo else datetime.now() - age_days = max((now - checked_at).days, 0) - if age_days <= 14: - return 100.0 - if age_days <= 30: - return 80.0 - if age_days <= 90: - return 50.0 - return 25.0 - - def _prompt_efficiency_score(avg_input_tokens: float | None) -> float: - if avg_input_tokens is None: - return 0.0 - if avg_input_tokens <= 20_000: - return 1.0 - if avg_input_tokens <= 50_000: - return 0.8 - if avg_input_tokens <= 100_000: - return 0.5 - return 0.25 - - if repo_context_buckets: - repos = ( - db.query(GitRepository) - .filter( - GitRepository.id.in_(list(repo_context_buckets.keys())), - ) - .all() - ) - repos_by_id = {repo.id: repo for repo in repos} - for repo in repos: - if repo.ai_readiness_score is not None: - project_readiness.append( - ProjectReadinessEntry( - repository=repo.full_name, - has_claude_md=repo.has_claude_md or False, - has_agents_md=repo.has_agents_md or False, - has_claude_dir=repo.has_claude_dir or False, - ai_readiness_score=repo.ai_readiness_score or 0.0, - session_count=len(repo_context_buckets[repo.id]["sessions"]), - ) - ) - for repository_id, bucket in repo_context_buckets.items(): - repo = repos_by_id.get(repository_id) - if repo is None: - continue - session_count = len(bucket["sessions"]) - input_tokens = bucket["input_tokens"] - cache_read_tokens = bucket["cache_read_tokens"] - token_denominator = input_tokens + cache_read_tokens - cache_hit_rate = ( - round(cache_read_tokens / token_denominator, 3) - if token_denominator > 0 - else None - ) - avg_input_tokens = ( - round(input_tokens / session_count, 1) - if session_count > 0 and token_denominator > 0 - else None - ) - - guide_coverage_score = ( - repo.ai_readiness_score - if repo.ai_readiness_score is not None - else ( - (50.0 if repo.has_claude_md else 0.0) - + (20.0 if repo.has_agents_md else 0.0) - + (30.0 if repo.has_claude_dir else 0.0) - ) - ) - guide_freshness_score = _guide_freshness_score(repo.ai_readiness_checked_at) - context_usage_coverage_pct = _coverage_pct( - len(bucket["context_usage_sessions"]), - session_count, - ) - tool_coverage_pct = _coverage_pct(len(bucket["tool_sessions"]), session_count) - model_coverage_pct = _coverage_pct(len(bucket["model_sessions"]), session_count) - facet_coverage_pct = _coverage_pct(len(bucket["facet_sessions"]), session_count) - sensor_coverage_score = round( - ( - context_usage_coverage_pct - + tool_coverage_pct - + model_coverage_pct - + facet_coverage_pct - ) - / 4, - 1, - ) - cache_score = min((cache_hit_rate or 0.0) / 0.5, 1.0) - token_efficiency_score = round( - ((cache_score * 0.6) + (_prompt_efficiency_score(avg_input_tokens) * 0.4)) - * 100, - 1, - ) - context_quality_score = round( - (guide_coverage_score * 0.30) - + (guide_freshness_score * 0.15) - + (token_efficiency_score * 0.25) - + (sensor_coverage_score * 0.30), - 1, - ) - gaps: list[str] = [] - if not repo.has_claude_md: - gaps.append("Add CLAUDE.md") - if not repo.has_agents_md: - gaps.append("Add AGENTS.md") - if guide_freshness_score < 75: - gaps.append("Refresh guidance scan") - if cache_hit_rate is None: - gaps.append("Add token/cache telemetry") - elif cache_hit_rate < 0.25: - gaps.append("Improve cache reuse") - if avg_input_tokens is not None and avg_input_tokens > 50_000: - gaps.append("Trim prompt/context payloads") - if context_usage_coverage_pct < 50: - gaps.append("Increase context telemetry coverage") - if tool_coverage_pct < 90: - gaps.append("Complete tool telemetry") - if model_coverage_pct < 90: - gaps.append("Complete model telemetry") - if facet_coverage_pct < 90: - gaps.append("Complete outcome facets") - - context_quality.append( - ContextQualityEntry( - repository=repo.full_name, - session_count=session_count, - context_quality_score=context_quality_score, - guide_coverage_score=round(guide_coverage_score, 1), - guide_freshness_score=guide_freshness_score, - token_efficiency_score=token_efficiency_score, - sensor_coverage_score=sensor_coverage_score, - cache_hit_rate=cache_hit_rate, - avg_input_tokens=avg_input_tokens, - context_usage_coverage_pct=context_usage_coverage_pct, - tool_coverage_pct=tool_coverage_pct, - model_coverage_pct=model_coverage_pct, - facet_coverage_pct=facet_coverage_pct, - has_claude_md=repo.has_claude_md or False, - has_agents_md=repo.has_agents_md or False, - readiness_checked_at=repo.ai_readiness_checked_at, - top_gaps=gaps[:4], - ) - ) - project_readiness.sort(key=lambda p: p.ai_readiness_score, reverse=True) - context_quality.sort(key=lambda row: (-row.context_quality_score, -row.session_count)) + project_readiness, context_quality = build_project_readiness_and_context_quality( + db=db, + session_id_subq=session_id_subq, + model_rows=model_rows, + per_session=per_session, + sessions_analyzed=sessions_analyzed, + ) # Aggregate metrics avg_leverage = (