diff --git a/.env.example b/.env.example index 2b1f38e..175e94d 100644 --- a/.env.example +++ b/.env.example @@ -154,3 +154,7 @@ FEATURE_HEALTH_MONITOR=true FEATURE_SCHEDULER=true FEATURE_TOOLS_FABRIC=true FEATURE_RISK_TIERED_GOVERNANCE=true + +# Optional federation control plane. +# FEATURE_FEDERATION=false +# FEATURE_FEDERATION_DASHBOARD=false diff --git a/Dockerfile b/Dockerfile index 430b177..f7542d8 100644 --- a/Dockerfile +++ b/Dockerfile @@ -66,8 +66,11 @@ RUN mkdir -p /home/lancelot/data && \ # Build War Room React SPA RUN cd src/warroom && npm ci && npm run build && rm -rf node_modules -# Change ownership of the application directory to the non-root user -RUN chown -R lancelot:lancelot /home/lancelot +# Runtime-writable paths are owned by the non-root user. The application, +# virtualenv, and browser binaries remain root-owned/readable, which avoids +# an expensive recursive chown over the full image during every rebuild. +RUN mkdir -p /home/lancelot/data /home/lancelot/workspace /home/lancelot/.codex && \ + chown -R lancelot:lancelot /home/lancelot/data /home/lancelot/workspace /home/lancelot/.codex # F-001: Docker group no longer needed — socket proxy used instead of direct mount diff --git a/config/federation.yaml b/config/federation.yaml index 22d7d51..4fb723f 100644 --- a/config/federation.yaml +++ b/config/federation.yaml @@ -64,3 +64,13 @@ cost_report_interval_s: 30.0 # How often children report cost to root # ── Self Address ──────────────────────────────────────────── # This instance's externally-reachable address for peer registration # self_address: "http://localhost:8000" + +# Dashboard settings for the operator-facing fleet view. +dashboard: + enabled: true + poll_interval_s: 10 + stream_interval_s: 3 + max_recent_activity_items: 50 + card_sort_order: "urgency" # urgency | alphabetical | role + show_fleet_activity_feed: true + activity_feed_max_events: 200 diff --git a/config/model_profiles.yaml b/config/model_profiles.yaml index 5f18b61..93c73df 100644 --- a/config/model_profiles.yaml +++ b/config/model_profiles.yaml @@ -106,6 +106,34 @@ profiles: cost_input_per_1k: 0.00113 cost_output_per_1k: 0.0045 + gpt-5.5: + capability_tier: deep + context_window: 1000000 + supports_tools: true + cost_input_per_1k: 0.005 + cost_output_per_1k: 0.03 + + gpt-5.4: + capability_tier: deep + context_window: 1000000 + supports_tools: true + cost_input_per_1k: 0.0025 + cost_output_per_1k: 0.015 + + gpt-5.4-mini: + capability_tier: fast + context_window: 400000 + supports_tools: true + cost_input_per_1k: 0.00075 + cost_output_per_1k: 0.0045 + + gpt-5.4-nano: + capability_tier: fast + context_window: 400000 + supports_tools: true + cost_input_per_1k: 0.00005 + cost_output_per_1k: 0.0004 + # ── Anthropic ────────────────────────────────────────────────────── claude-3-5-haiku-latest: capability_tier: fast diff --git a/config/models.yaml b/config/models.yaml index 5e3e6e1..4af9fd8 100644 --- a/config/models.yaml +++ b/config/models.yaml @@ -47,7 +47,7 @@ providers: max_tokens: 16384 temperature: 0.3 deep: - model: "gpt-5.4" + model: "gpt-5.5" max_tokens: 128000 temperature: 0.7 cache: @@ -63,7 +63,7 @@ providers: max_tokens: 16384 temperature: 0.3 deep: - model: "gpt-5.4" + model: "gpt-5.5" max_tokens: 128000 temperature: 0.7 cache: diff --git a/docs/INDEX.md b/docs/INDEX.md index bd29a34..6cdbc8a 100644 --- a/docs/INDEX.md +++ b/docs/INDEX.md @@ -7,7 +7,7 @@ - [Proof Walkthrough](proof-walkthrough.md) - [Release Verification](release-verification.md) - [Configuration Reference](configuration-reference.md) -- [War Room Guide](war-room.md) +- [War Room Guide](war-room.md) - includes Fleet Dashboard operator workflow ## Architecture @@ -18,7 +18,7 @@ - [Receipts](receipts.md) - [UAB](uab.md) - [HIVE](hive.md) -- [Federation](federation.md) +- [Federation](federation.md) - includes Fleet Dashboard API and proxy approval contract - [MCP Governance](mcp.md) ## Security and Operations diff --git a/docs/federation.md b/docs/federation.md index 2470f70..2caaaaf 100644 --- a/docs/federation.md +++ b/docs/federation.md @@ -39,6 +39,62 @@ Mode is derived automatically from the topology shape — not configured directl --- +## Fleet Dashboard + +The Fleet Dashboard is the operator control layer above individual War Rooms. It is exposed in War Room at `/war-room/federation/fleet` when `FEATURE_FEDERATION_DASHBOARD=true` and `FEATURE_FEDERATION=true`. + +It is designed to answer one operational question first: which Lancelot instances need human attention right now? + +### Snapshot Contract + +The dashboard control plane uses `GET /api/federation/dashboard` for the fleet snapshot and `GET /api/federation/dashboard/stream` for live snapshot updates. The root dashboard fetches each reachable peer through the signed `GET /api/federation/dashboard/local` endpoint, so peer detail uses the same Ed25519 federation request-signing and replay protection as the rest of the federation plane. + +The snapshot includes: + +- fleet counters: total instances, instances needing attention, critical instances, pending approvals, active agents, Soul consistency, and fleet cost utilization +- sorted instance cards with health, heartbeat, Soul hash, budget, active HIVE agents, pending approvals, trust proposals, latest receipt activity, and attention reasons +- unified approval queue entries aggregated across the local instance and peers +- unified trust graduation proposals aggregated across the local instance and peers +- fleet activity sourced from receipt streams, not a separate event log + +### Instance Card Semantics + +`health` is the local instance health-monitor/readiness state. Federation notices such as stale cost data, Soul propagation, or peer-detail fetch failures are shown as `Needs Attention` reasons instead of automatically marking the instance health as degraded. + +Important card fields: + +| Field | Meaning | +|-------|---------| +| `command_center_url` | Deep link to the instance Command Center (`/war-room/command`) | +| `health` | Local health snapshot: healthy, degraded, or error | +| `heartbeat` | Federation heartbeat freshness for that instance | +| `budget` | Latest cost utilization and threshold state | +| `recent_activity` | Latest receipt-derived activity description | +| `attention_reasons` | Operator-facing notices that explain amber/red card state | + +The local `SELF` card links inside the current War Room. Remote cards link to the peer War Room address with the Command Center path appended. The War Room auth flow preserves the requested path so a signed-in operator lands back on the deep link after login. + +### Unified Approval Proxy + +Fleet-level approve and deny actions call: + +| Method | Endpoint | Description | +|--------|----------|-------------| +| POST | `/dashboard/instances/{instance_id}/approvals/{approval_id}/approve` | Approve a pending local or remote dashboard approval | +| POST | `/dashboard/instances/{instance_id}/approvals/{approval_id}/deny` | Deny a pending local or remote dashboard approval | +| POST | `/dashboard/local/approvals/{approval_id}/approve` | Root-to-peer signed local approval endpoint | +| POST | `/dashboard/local/approvals/{approval_id}/deny` | Root-to-peer signed local denial endpoint | + +Operator decisions require both `federation.admin` and `governance.admin` on the root dashboard path. Remote local endpoints require an operator request or a signed ROOT peer request. Every proxy decision carries operator identity and emits a governance receipt with `federated_proxy=true`. + +### Activity and Attention Sources + +Fleet Activity and card `Latest Activity` are receipt-backed. They surface meaningful receipt descriptions from `action_name`, tool/capability metadata, request text, or receipt action type. + +An instance can be `Healthy` while still `Needs Attention`. Examples include pending approvals, pending trust proposals, stale heartbeat, stale budget telemetry for a current peer, Soul mismatch, remote dashboard detail unavailable, or runtime errors reported by the federation control plane. + +--- + ## Module Reference | Module | Purpose | @@ -456,6 +512,9 @@ The audit engine supports complete timeline reconstruction across all instances, |--------|----------|-------------| | GET | `/stream` | SSE heartbeat stream + initial snapshot | | GET | `/health` | Current health summary | +| GET | `/dashboard` | Fleet Dashboard snapshot with local and peer cards | +| GET | `/dashboard/stream` | Fleet Dashboard SSE snapshot stream | +| GET | `/dashboard/local` | Local dashboard detail for an operator or signed ROOT peer | ### Discovery | Method | Endpoint | Description | @@ -541,6 +600,10 @@ Pause semantics are now fail-closed and runtime-backed: |--------|----------|-------------| | POST | `/killswitch` | Receive kill command | | POST | `/cost/report` | Report cost data | +| POST | `/dashboard/instances/{instance_id}/approvals/{approval_id}/approve` | Approve a dashboard approval on a local or remote instance | +| POST | `/dashboard/instances/{instance_id}/approvals/{approval_id}/deny` | Deny a dashboard approval on a local or remote instance | +| POST | `/dashboard/local/approvals/{approval_id}/approve` | Signed ROOT-to-peer local approval proxy endpoint | +| POST | `/dashboard/local/approvals/{approval_id}/deny` | Signed ROOT-to-peer local denial proxy endpoint | Budget-report authority is intentionally narrow: @@ -604,3 +667,10 @@ Peer state is persisted in SQLite with WAL mode for concurrent reads and thread- | `auth_timestamp_window_s` | 30.0 | Replay protection window | | `nonce_cache_size` | 10000 | Max cached nonces | | `cost_report_interval_s` | 30.0 | Cost reporting interval | +| `dashboard.enabled` | true | Enables dashboard API responses when feature flags are enabled | +| `dashboard.poll_interval_s` | 10.0 | War Room polling interval for dashboard refreshes | +| `dashboard.stream_interval_s` | 3.0 | Dashboard SSE snapshot interval | +| `dashboard.max_recent_activity_items` | 50 | Receipt-backed activity items kept per local snapshot | +| `dashboard.card_sort_order` | urgency | Instance card sort order: urgency, alphabetical, or role | +| `dashboard.show_fleet_activity_feed` | true | Include fleet activity rows in the snapshot | +| `dashboard.activity_feed_max_events` | 200 | Max fleet activity rows returned to War Room | diff --git a/docs/war-room.md b/docs/war-room.md index 6986569..a1a12af 100644 --- a/docs/war-room.md +++ b/docs/war-room.md @@ -576,6 +576,58 @@ That means the page can now reflect real federation control-plane degradation in --- +## Fleet Dashboard + +The Fleet Dashboard is the multi-instance operator view for federated Lancelot deployments. It appears in the Federation navigation when both `FEATURE_FEDERATION=true` and `FEATURE_FEDERATION_DASHBOARD=true`. + +**Access:** `/war-room/federation/fleet` + +Use it as the first screen when you are operating more than one Lancelot instance. It surfaces health awareness, heartbeat freshness, budget state, pending approvals, trust proposals, active HIVE agent counts, recent receipt activity, and attention reasons in one place. + +### Instance Cards + +Each card represents one Lancelot instance and is sorted by urgency by default. The card shows: + +- local health monitor state +- heartbeat freshness +- Soul hash/version signal +- active agent count +- pending approval count +- pending trust proposal count +- budget utilization +- latest receipt-backed activity +- specific `Needs Attention` notices + +The `Open Command Center` button deep-links to that instance's Command Center, not just the War Room root. For the local instance it opens `/war-room/command`; for remote peers it opens the peer address with `/war-room/command` appended. If the operator must sign in first, the auth flow preserves the destination and returns to the deep link after login. + +### Health vs. Needs Attention + +`Health` is the instance readiness/health monitor result. It should only show degraded or error when the instance health snapshot says so. + +`Needs Attention` is broader. A card can be healthy and still need attention because it has pending approvals, pending trust proposals, stale heartbeat, stale budget telemetry, remote detail unavailable, Soul mismatch, or federation runtime notices. + +`Latest Activity` comes from receipts. It should begin populating after governed chat actions, HIVE events, approvals, denials, kills, pauses, or other receipted work runs on that instance. + +### Unified Approval Queue + +The Unified Approval Queue aggregates pending T2/T3 governance approvals across the local instance and federated peers. Approve and Deny actions are sent through the federation dashboard proxy, carry the operator identity, and emit governance receipts. + +Use the instance Governance Dashboard for detailed local review when needed, but the fleet queue is the control point for operating multiple instances without tab hopping. + +### Fleet Activity + +Fleet Activity is receipt-backed. It is not a standalone event log. If no activity appears after an agent or governed command runs, check that the instance is emitting receipts and that remote dashboard detail is reachable through signed federation traffic. + +### Troubleshooting + +If the Fleet Dashboard page is missing, verify both feature flags and `dashboard.enabled` in `config/federation.yaml`. + +If a peer card says remote detail is unavailable, check peer registration, `self_address`, signed federation auth, and the peer's `/api/federation/dashboard/local` endpoint. + +If `Needs Attention` names stale cost peers, confirm those peer IDs are still registered. Stale cost telemetry for unregistered peers should be filtered out. + +--- + ## Tips for Daily Operation 1. **Start your session** by checking the Health panel — make sure everything is green diff --git a/src/core/boot.py b/src/core/boot.py index 1238000..840fd7d 100644 --- a/src/core/boot.py +++ b/src/core/boot.py @@ -189,7 +189,16 @@ def _validate_boot_environment(api_token: str | None) -> BootEnvironment: async def _start_core_runtime_services() -> None: """Start gateway-owned services that critical subsystems depend on.""" librarian.start() - await antigravity.start() + try: + from feature_flags import FEATURE_TOOLS_ANTIGRAVITY + except Exception as exc: + FEATURE_TOOLS_ANTIGRAVITY = False + logger.warning("Antigravity feature flag lookup failed; skipping browser startup: %s", exc) + + if FEATURE_TOOLS_ANTIGRAVITY: + await antigravity.start() + else: + logger.info("Antigravity browser startup skipped (FEATURE_TOOLS_ANTIGRAVITY=false).") def _wire_orchestrator_runtime_services() -> None: diff --git a/src/core/feature_flags.py b/src/core/feature_flags.py index 18ae90c..977346f 100644 --- a/src/core/feature_flags.py +++ b/src/core/feature_flags.py @@ -56,6 +56,7 @@ Federation Environment variables: FEATURE_FEDERATION — default: false (multi-instance federation layer) + FEATURE_FEDERATION_DASHBOARD — default: false (fleet dashboard UI/API) MCP (Model Context Protocol) Environment variables: FEATURE_MCP — default: false (master kill switch for all MCP invocations) @@ -244,6 +245,7 @@ def _env_bool(key: str, default: bool = True) -> bool: # Federation — multi-instance coordination FEATURE_FEDERATION: bool = _env_bool("FEATURE_FEDERATION", default=False) # Master switch for Federation subsystem (Governance API, heartbeat, identity) +FEATURE_FEDERATION_DASHBOARD: bool = _env_bool("FEATURE_FEDERATION_DASHBOARD", default=False) # Operator fleet dashboard above per-instance War Rooms # MCP (Model Context Protocol) — governed tool proxy FEATURE_MCP: bool = _env_bool("FEATURE_MCP", default=False) # Master kill switch for all MCP tool invocations @@ -335,7 +337,7 @@ def reload_flags() -> None: global FEATURE_TOOL_FLOW_STREAMING, FEATURE_ACTION_CARDS global FEATURE_HIVE, FEATURE_HIVE_UAB global FEATURE_VAULT_SECRETS - global FEATURE_FEDERATION + global FEATURE_FEDERATION, FEATURE_FEDERATION_DASHBOARD global FEATURE_MCP global FEATURE_TIME_TRAVEL global FEATURE_A2A @@ -417,6 +419,7 @@ def reload_flags() -> None: # Federation FEATURE_FEDERATION = _env_bool("FEATURE_FEDERATION", default=False) + FEATURE_FEDERATION_DASHBOARD = _env_bool("FEATURE_FEDERATION_DASHBOARD", default=False) # MCP FEATURE_MCP = _env_bool("FEATURE_MCP", default=False) @@ -516,8 +519,8 @@ def log_feature_flags() -> None: FEATURE_VAULT_SECRETS, ) logger.info( - "Federation flags: FEDERATION=%s", - FEATURE_FEDERATION, + "Federation flags: FEDERATION=%s, FEDERATION_DASHBOARD=%s", + FEATURE_FEDERATION, FEATURE_FEDERATION_DASHBOARD, ) logger.info( "MCP flags: MCP=%s", diff --git a/src/core/flags_api.py b/src/core/flags_api.py index 0796653..b3f6984 100644 --- a/src/core/flags_api.py +++ b/src/core/flags_api.py @@ -367,6 +367,13 @@ def init_flags_api(audit_logger=None): "conflicts": [], "warning": "Exposes federation API endpoints. Peers authenticate via Ed25519 challenge/response. All federation events are receipt-traced and audit-logged.", }, + "FEATURE_FEDERATION_DASHBOARD": { + "description": "Federation Dashboard - operator fleet view above per-instance War Rooms. Surfaces instance health, pending approvals, trust proposals, budget pressure, and Command Center entry points.", + "category": "Federation", + "requires": ["FEATURE_FEDERATION"], + "conflicts": [], + "warning": "Shows cross-instance operational metadata to authenticated War Room operators. Remote detail retrieval uses signed federation requests.", + }, # ── MCP (Model Context Protocol) ──────────────────────────────── "FEATURE_MCP": { diff --git a/src/core/gateway_boot_support.py b/src/core/gateway_boot_support.py index 2cae49d..f5eb720 100644 --- a/src/core/gateway_boot_support.py +++ b/src/core/gateway_boot_support.py @@ -1359,30 +1359,46 @@ def _bootstrap_model_discovery(): _persisted_config = load_persisted_config() _persisted_lane_overrides = _persisted_config.get("lane_overrides", {}) - _lane_overrides = {} + _fallback_lanes = {} try: from provider_profile import ProfileRegistry _registry = ProfileRegistry() _prov_name = main_orchestrator.provider.provider_name if _registry.has_provider(_prov_name): _profile = _registry.get_profile(_prov_name) - _lane_overrides["fast"] = _profile.fast.model - _lane_overrides["deep"] = _profile.deep.model + _fallback_lanes["fast"] = _profile.fast.model + _fallback_lanes["deep"] = _profile.deep.model if _profile.cache: - _lane_overrides["cache"] = _profile.cache.model + _fallback_lanes["cache"] = _profile.cache.model except Exception as exc: logger.warning("Model discovery profile lookup failed; using persisted/env overrides only: %s", exc) - _lane_overrides.update(_persisted_lane_overrides) + _lane_overrides = dict(_persisted_lane_overrides) - discovery = ModelDiscovery( - provider=main_orchestrator.provider, - profiles_path="config/model_profiles.yaml", - lane_overrides=_lane_overrides, - ) + try: + discovery = ModelDiscovery( + provider=main_orchestrator.provider, + profiles_path="config/model_profiles.yaml", + lane_overrides=_lane_overrides, + fallback_lanes=_fallback_lanes, + ) + except TypeError: + discovery = ModelDiscovery( + provider=main_orchestrator.provider, + profiles_path="config/model_profiles.yaml", + lane_overrides=_lane_overrides, + ) discovery.refresh() + for _lane, _model_id in discovery.lane_assignments.items(): + try: + main_orchestrator.set_lane_model(_lane, _model_id) + except Exception as _e: + logger.warning("Failed to apply lane assignment %s=%s: %s", _lane, _model_id, _e) + for _lane, _model_id in _persisted_lane_overrides.items(): + if discovery.lane_assignments.get(_lane) == _model_id: + continue try: main_orchestrator.set_lane_model(_lane, _model_id) except Exception as _e: diff --git a/src/core/model_discovery.py b/src/core/model_discovery.py index 0fc217c..efc0aca 100644 --- a/src/core/model_discovery.py +++ b/src/core/model_discovery.py @@ -14,6 +14,7 @@ import logging import os +import re from datetime import datetime, timezone from pathlib import Path from typing import Optional @@ -51,12 +52,15 @@ def __init__( provider: ProviderClient, profiles_path: str = None, lane_overrides: Optional[dict] = None, + fallback_lanes: Optional[dict] = None, ): self._provider = provider self._profiles = _load_profiles(profiles_path) self._lane_overrides = lane_overrides or {} + self._fallback_lanes = fallback_lanes or {} self._discovered: list[ModelInfo] = [] self._lane_assignments: dict[str, str] = {} + self._lane_sources: dict[str, str] = {} self._last_refresh: Optional[datetime] = None @property @@ -71,6 +75,10 @@ def discovered_models(self) -> list[ModelInfo]: def lane_assignments(self) -> dict[str, str]: return dict(self._lane_assignments) + @property + def lane_sources(self) -> dict[str, str]: + return dict(self._lane_sources) + def refresh(self) -> None: """Query provider API for available models and assign to lanes.""" try: @@ -117,61 +125,81 @@ def _auto_assign_lanes(self) -> dict[str, str]: - deep lane: highest-capability model with tool support - cache lane: cheapest model (tools not required) - Lane overrides (from models.yaml config) take priority. + Explicit lane overrides take priority. Profile lane defaults are used + only as fallback when discovery cannot assign a lane. """ assignments = {} + sources = {} # Apply overrides first for lane in ("fast", "deep", "cache"): if lane in self._lane_overrides: assignments[lane] = self._lane_overrides[lane] + sources[lane] = "override" + + if self._discovered: + # Filter models with tool support + tool_models = [m for m in self._discovered if m.supports_tools] + all_models = list(self._discovered) + + # Sort by cost (ascending) for fast/cache selection + tool_models_by_cost = sorted( + tool_models, + key=lambda m: m.output_cost_per_1k or 999.0, + ) + all_by_cost = sorted( + all_models, + key=lambda m: m.output_cost_per_1k or 999.0, + ) + + # Sort by capability for deep selection + _TIER_RANK = {"fast": 1, "standard": 2, "deep": 3} + tool_models_by_cap = sorted( + tool_models, + key=lambda m: ( + _TIER_RANK.get(m.capability_tier, 2), + _model_version_score(m.id), + m.context_window or 0, + m.output_cost_per_1k or 0.0, + ), + reverse=True, + ) + + # Fast lane: cheapest with tools + if "fast" not in assignments and tool_models_by_cost: + assignments["fast"] = tool_models_by_cost[0].id + sources["fast"] = "auto" + + # Deep lane: highest capability with tools + if "deep" not in assignments and tool_models_by_cap: + assignments["deep"] = tool_models_by_cap[0].id + sources["deep"] = "auto" + + # Cache lane: cheapest overall + if "cache" not in assignments: + if all_by_cost: + assignments["cache"] = all_by_cost[0].id + sources["cache"] = "auto" + elif "fast" in assignments: + assignments["cache"] = assignments["fast"] + sources["cache"] = sources.get("fast", "auto") + + for lane in ("fast", "deep", "cache"): + if lane not in assignments and lane in self._fallback_lanes: + assignments[lane] = self._fallback_lanes[lane] + sources[lane] = "fallback" + self._lane_sources = sources if not self._discovered: return assignments - # Filter models with tool support - tool_models = [m for m in self._discovered if m.supports_tools] - all_models = list(self._discovered) - - # Sort by cost (ascending) for fast/cache selection - tool_models_by_cost = sorted( - tool_models, - key=lambda m: m.output_cost_per_1k or 999.0, - ) - all_by_cost = sorted( - all_models, - key=lambda m: m.output_cost_per_1k or 999.0, - ) - - # Sort by capability for deep selection - _TIER_RANK = {"fast": 1, "standard": 2, "deep": 3} - tool_models_by_cap = sorted( - tool_models, - key=lambda m: _TIER_RANK.get(m.capability_tier, 2), - reverse=True, - ) - - # Fast lane: cheapest with tools - if "fast" not in assignments and tool_models_by_cost: - assignments["fast"] = tool_models_by_cost[0].id - - # Deep lane: highest capability with tools - if "deep" not in assignments and tool_models_by_cap: - assignments["deep"] = tool_models_by_cap[0].id - - # Cache lane: cheapest overall - if "cache" not in assignments: - if all_by_cost: - assignments["cache"] = all_by_cost[0].id - elif "fast" in assignments: - assignments["cache"] = assignments["fast"] - return assignments def set_lane_override(self, lane: str, model_id: str) -> None: """Override a single lane's model assignment at runtime.""" self._lane_overrides[lane] = model_id self._lane_assignments[lane] = model_id + self._lane_sources[lane] = "override" logger.info("Lane '%s' overridden to %s", lane, model_id) def reset_overrides(self) -> None: @@ -184,10 +212,13 @@ def replace_provider( self, new_provider: ProviderClient, lane_overrides: Optional[dict] = None, + fallback_lanes: Optional[dict] = None, ) -> None: """Hot-swap the underlying provider and re-run discovery.""" self._provider = new_provider self._lane_overrides = lane_overrides or {} + if fallback_lanes is not None: + self._fallback_lanes = fallback_lanes or {} self.refresh() def get_lane_model(self, lane: str) -> Optional[str]: @@ -233,6 +264,7 @@ def get_stack(self) -> dict: "context_window": profile.get("context_window", 0), "cost_output_per_1k": profile.get("cost_output_per_1k", 0.0), "supports_tools": profile.get("supports_tools", False), + "source": self._lane_sources.get(lane, "auto"), } return { @@ -255,4 +287,22 @@ def get_stack(self) -> dict: "last_refresh": ( self._last_refresh.isoformat() if self._last_refresh else None ), + "lane_sources": dict(self._lane_sources), } + + +def _model_version_score(model_id: str) -> tuple[int, ...]: + """Best-effort recency score for model IDs that encode versions.""" + model = model_id.lower() + match = re.search(r"gpt[-_]?(\d+(?:\.\d+){0,2})", model) + if not match: + match = re.search(r"gemini[-_](\d+(?:\.\d+){0,2})", model) + if not match: + match = re.search(r"grok[-_](\d+(?:\.\d+){0,2})", model) + if not match: + match = re.search(r"claude[-_](?:opus|sonnet|haiku)[-_](\d+)(?:[-_](\d+))?", model) + if match: + return tuple(int(part) for part in match.groups(default="0")) + if match: + return tuple(int(part) for part in match.group(1).split(".")) + return (0,) diff --git a/src/core/providers/api.py b/src/core/providers/api.py index dd2c25e..18473c5 100644 --- a/src/core/providers/api.py +++ b/src/core/providers/api.py @@ -269,7 +269,7 @@ def _codex_cli_auth_available() -> bool: return False -def _provider_profile_lane_overrides(provider_name: str) -> dict: +def _read_provider_profile_lanes(provider_name: str, label: str) -> dict: try: from provider_profile import ProfileRegistry @@ -285,13 +285,47 @@ def _provider_profile_lane_overrides(provider_name: str) -> dict: return lane_overrides except Exception as exc: logger.warning( - "Failed to seed provider lane overrides from profile '%s': %s", + "Failed to seed provider lane %s from profile '%s': %s", + label, provider_name, exc, ) return {} +def _provider_profile_lane_defaults(provider_name: str) -> dict: + return _read_provider_profile_lanes(provider_name, "defaults") + + +def _provider_profile_lane_overrides(provider_name: str) -> dict: + """Backward-compatible alias for tests and older call sites.""" + return _read_provider_profile_lanes(provider_name, "overrides") + + +def _create_model_discovery(provider, *, lane_overrides: dict, fallback_lanes: dict): + from model_discovery import ModelDiscovery + + try: + return ModelDiscovery( + provider, + lane_overrides=lane_overrides, + fallback_lanes=fallback_lanes, + ) + except TypeError: + return ModelDiscovery(provider, lane_overrides=lane_overrides) + + +def _replace_discovery_provider(discovery, new_provider, *, lane_overrides: dict, fallback_lanes: dict) -> None: + try: + discovery.replace_provider( + new_provider, + lane_overrides=lane_overrides, + fallback_lanes=fallback_lanes, + ) + except TypeError: + discovery.replace_provider(new_provider, lane_overrides=lane_overrides) + + # --------------------------------------------------------------------------- # Existing endpoints (v8.3.0) # --------------------------------------------------------------------------- @@ -479,19 +513,24 @@ def switch_provider(req: SwitchProviderRequest): # Read existing config to preserve lane overrides if desired config = _read_current_config() lane_overrides = config.get("lane_overrides", {}) - - # If no lane overrides, seed from models.yaml profile - if not lane_overrides: - lane_overrides = _provider_profile_lane_overrides(provider_name) + fallback_lanes = _provider_profile_lane_defaults(provider_name) # Replace provider in discovery and re-run global _discovery if _discovery: - _discovery.replace_provider(new_provider, lane_overrides=lane_overrides) + _replace_discovery_provider( + _discovery, + new_provider, + lane_overrides=lane_overrides, + fallback_lanes=fallback_lanes, + ) else: # Discovery wasn't initialized at startup (no provider). Create it now. - from model_discovery import ModelDiscovery - _discovery = ModelDiscovery(new_provider, lane_overrides=lane_overrides) + _discovery = _create_model_discovery( + new_provider, + lane_overrides=lane_overrides, + fallback_lanes=fallback_lanes, + ) _discovery.refresh() # Persist the switch @@ -742,10 +781,16 @@ def rotate_provider_key(req: RotateKeyRequest): config = _read_current_config() lane_overrides = config.get("lane_overrides", {}) + fallback_lanes = _provider_profile_lane_defaults(provider_name) if _discovery: new_provider = create_provider(provider_name, new_key) - _discovery.replace_provider(new_provider, lane_overrides=lane_overrides) + _replace_discovery_provider( + _discovery, + new_provider, + lane_overrides=lane_overrides, + fallback_lanes=fallback_lanes, + ) hot_swapped = True except Exception as e: diff --git a/src/core/providers/codex_cli_client.py b/src/core/providers/codex_cli_client.py index dd4e992..595451b 100644 --- a/src/core/providers/codex_cli_client.py +++ b/src/core/providers/codex_cli_client.py @@ -86,6 +86,7 @@ class CodexCLIProviderClient(ProviderClient): """ _CODEX_MODELS = [ + ModelInfo(id="gpt-5.5", display_name="GPT-5.5", supports_tools=True, capability_tier="deep"), ModelInfo(id="gpt-5.4", display_name="GPT-5.4", supports_tools=True, capability_tier="deep"), ModelInfo(id="gpt-5.4-mini", display_name="GPT-5.4 Mini", supports_tools=True, capability_tier="fast"), ModelInfo(id="gpt-5.4-nano", display_name="GPT-5.4 Nano", supports_tools=True, capability_tier="fast"), diff --git a/src/core/providers/codex_responses_client.py b/src/core/providers/codex_responses_client.py index 36309dd..0840c3f 100644 --- a/src/core/providers/codex_responses_client.py +++ b/src/core/providers/codex_responses_client.py @@ -27,8 +27,11 @@ class OpenAICodexResponsesProviderClient(ProviderClient): CODEX_BASE_URL = "https://chatgpt.com/backend-api/codex" _DEFAULT_INSTRUCTIONS = "You are Lancelot's governed model planner." _CODEX_MODELS = [ + ModelInfo(id="gpt-5.5", display_name="GPT-5.5", supports_tools=True, + capability_tier="deep", context_window=1000000, + input_cost_per_1k=0.005, output_cost_per_1k=0.03), ModelInfo(id="gpt-5.4", display_name="GPT-5.4", supports_tools=True, - capability_tier="deep", context_window=1050000, + capability_tier="deep", context_window=1000000, input_cost_per_1k=0.0025, output_cost_per_1k=0.015), ModelInfo(id="gpt-5.4-mini", display_name="GPT-5.4 Mini", supports_tools=True, capability_tier="fast", context_window=272000, diff --git a/src/core/providers/openai_client.py b/src/core/providers/openai_client.py index 321055c..f2754f1 100644 --- a/src/core/providers/openai_client.py +++ b/src/core/providers/openai_client.py @@ -184,12 +184,15 @@ def build_user_message(self, text: str, images: Optional[list] = None) -> Any: # Legacy Codex OAuth model list retained for compatibility only. # Pricing: per 1M tokens → per 1K tokens (divide by 1000) _CODEX_MODELS = [ + ModelInfo(id="gpt-5.5", display_name="GPT-5.5", supports_tools=True, + capability_tier="deep", context_window=1000000, + input_cost_per_1k=0.005, output_cost_per_1k=0.03), ModelInfo(id="gpt-5.4", display_name="GPT-5.4", supports_tools=True, - capability_tier="deep", context_window=400000, - input_cost_per_1k=0.00125, output_cost_per_1k=0.01), + capability_tier="deep", context_window=1000000, + input_cost_per_1k=0.0025, output_cost_per_1k=0.015), ModelInfo(id="gpt-5.4-mini", display_name="GPT-5.4 Mini", supports_tools=True, capability_tier="fast", context_window=400000, - input_cost_per_1k=0.00025, output_cost_per_1k=0.002), + input_cost_per_1k=0.00075, output_cost_per_1k=0.0045), ModelInfo(id="gpt-5.4-nano", display_name="GPT-5.4 Nano", supports_tools=True, capability_tier="fast", context_window=400000, input_cost_per_1k=0.00005, output_cost_per_1k=0.0004), @@ -214,9 +217,16 @@ def list_models(self) -> list[ModelInfo]: continue tier = "standard" - if "mini" in model_id: + if "mini" in model_id or "nano" in model_id: tier = "fast" - elif "o1" in model_id or "o3" in model_id or "o4" in model_id: + elif ( + "o1" in model_id + or "o3" in model_id + or "o4" in model_id + or model_id.startswith("gpt-5") + or model_id.endswith("-pro") + or "-pro-" in model_id + ): tier = "deep" models.append(ModelInfo( diff --git a/src/federation/api.py b/src/federation/api.py index f125854..c349df4 100644 --- a/src/federation/api.py +++ b/src/federation/api.py @@ -24,7 +24,9 @@ import asyncio import json import logging +from datetime import datetime, timezone from typing import Any, Dict, Optional +from urllib.parse import quote, urlparse from fastapi import APIRouter, Depends, HTTPException, Request from pydantic import BaseModel, ConfigDict, Field, ValidationError @@ -40,6 +42,9 @@ tags=["federation"], ) +_COST_STALE_REASON_PREFIX = "Federation cost data stale for peer(s): " +_HEARTBEAT_STREAM_FAILED_PREFIX = "Federation heartbeat stream failed for peer(s): " + # Module-level state — set by init_federation_api() _identity = None _heartbeat_emitter = None @@ -215,6 +220,20 @@ class ManageKillRequest(BaseModel): target_ids: Optional[list[str]] = None +class DashboardDecisionRequest(BaseModel): + model_config = ConfigDict(extra="forbid") + + reason: str = Field(..., min_length=1, max_length=1000) + + +class FederatedDashboardDecisionRequest(BaseModel): + model_config = ConfigDict(extra="forbid") + + reason: str = Field(..., min_length=1, max_length=1000) + operator_identity: Dict[str, Any] = Field(default_factory=dict) + source_instance_id: str = "" + + def init_federation_api( identity, heartbeat_emitter, @@ -293,6 +312,21 @@ def _summarize_circuit_breakers(states: Dict[str, Dict[str, Any]]) -> Dict[str, return summary +def _current_federation_instance_ids() -> set[str]: + ids: set[str] = set() + if _identity and getattr(_identity, "instance_id", ""): + ids.add(str(_identity.instance_id)) + if _topology_registry is not None: + try: + for peer in _topology_registry.list_peers(): + peer_id = str(getattr(peer, "instance_id", "") or "") + if peer_id: + ids.add(peer_id) + except Exception as exc: + logger.debug("Failed to collect current federation instance IDs: %s", exc) + return ids + + async def _parse_request_model( request: Request, model_cls: type[BaseModel], @@ -423,7 +457,17 @@ def _build_runtime_status() -> Dict[str, Any]: ) degraded_reasons.append("Federation cost status unavailable") cost_threshold = budget_status.get("threshold", cost_threshold) - stale_instance_ids = list(budget_status.get("stale_instance_ids", []) or []) + raw_stale_ids = [ + str(instance_id) + for instance_id in list(budget_status.get("stale_instance_ids", []) or []) + if str(instance_id) + ] + current_ids = _current_federation_instance_ids() + stale_instance_ids = [ + instance_id + for instance_id in raw_stale_ids + if not current_ids or instance_id in current_ids + ] if stale_instance_ids: degraded_reasons.append( "Federation cost data stale for peer(s): " @@ -479,6 +523,1194 @@ def _build_runtime_status() -> Dict[str, Any]: } +def _utc_now() -> datetime: + return datetime.now(timezone.utc) + + +def _utc_now_iso() -> str: + return _utc_now().isoformat() + + +def _safe_int(value: Any, default: int = 0) -> int: + try: + return int(value or 0) + except (TypeError, ValueError): + return default + + +def _safe_float(value: Any, default: float = 0.0) -> float: + try: + return float(value or 0.0) + except (TypeError, ValueError): + return default + + +def _short_id(value: str) -> str: + if not value: + return "" + return value[:12] + + +def _elapsed_seconds(iso_timestamp: Optional[str]) -> Optional[float]: + if not iso_timestamp: + return None + try: + parsed = datetime.fromisoformat(iso_timestamp) + if parsed.tzinfo is None: + parsed = parsed.replace(tzinfo=timezone.utc) + return max(0.0, (_utc_now() - parsed).total_seconds()) + except Exception: + return None + + +def _display_name_from_address(address: str) -> str: + if not address: + return "" + try: + parsed = urlparse(address) + return parsed.hostname or address.replace("https://", "").replace("http://", "") + except Exception: + return address.replace("https://", "").replace("http://", "") + + +def _command_center_url(address: str) -> str: + if not address: + return "" + base = str(address).rstrip("/") + if base.endswith("/war-room/command"): + return base + if base.endswith("/war-room"): + return f"{base}/command" + return f"{base}/war-room/command" + + +def _dashboard_instance_label_map() -> Dict[str, str]: + labels: Dict[str, str] = {} + if _identity and getattr(_identity, "instance_id", ""): + labels[str(_identity.instance_id)] = "Local Lancelot" + if _topology_registry is not None: + try: + for peer in _topology_registry.list_peers(): + peer_id = str(getattr(peer, "instance_id", "") or "") + if not peer_id: + continue + metadata = getattr(peer, "metadata", {}) or {} + address = str(getattr(peer, "address", "") or "") + labels[peer_id] = ( + str(metadata.get("instance_name", "")).strip() + or _display_name_from_address(address) + or f"Instance {_short_id(peer_id)}" + ) + except Exception as exc: + logger.debug("Failed to collect dashboard instance labels: %s", exc) + return labels + + +def _format_dashboard_instance_list(instance_ids: list[str]) -> str: + labels = _dashboard_instance_label_map() + return ", ".join( + labels.get(instance_id) or f"Instance {_short_id(instance_id)}" + for instance_id in instance_ids + ) + + +def _parse_reason_instance_ids(reason: str, prefix: str) -> list[str]: + if not reason.startswith(prefix): + return [] + return [ + item.strip() + for item in reason[len(prefix):].split(",") + if item.strip() + ] + + +def _dashboard_runtime_attention_reasons(runtime_status: Dict[str, Any]) -> list[str]: + current_ids = _current_federation_instance_ids() + formatted: list[str] = [] + for raw_reason in runtime_status.get("degraded_reasons", []) or []: + reason = str(raw_reason or "").strip() + if not reason: + continue + + stale_cost_ids = _parse_reason_instance_ids(reason, _COST_STALE_REASON_PREFIX) + if stale_cost_ids: + relevant_ids = [ + instance_id for instance_id in stale_cost_ids + if not current_ids or instance_id in current_ids + ] + if relevant_ids: + formatted.append( + "Cost telemetry stale for " + + _format_dashboard_instance_list(relevant_ids) + ) + continue + + failed_stream_ids = _parse_reason_instance_ids( + reason, + _HEARTBEAT_STREAM_FAILED_PREFIX, + ) + if failed_stream_ids: + relevant_ids = [ + instance_id for instance_id in failed_stream_ids + if not current_ids or instance_id in current_ids + ] + if relevant_ids: + formatted.append( + "Heartbeat stream failed for " + + _format_dashboard_instance_list(relevant_ids) + ) + continue + + formatted.append(reason) + return list(dict.fromkeys(formatted)) + + +def _dashboard_config_payload() -> Dict[str, Any]: + dashboard = getattr(_config, "dashboard", None) + return { + "enabled": bool(getattr(dashboard, "enabled", True)), + "poll_interval_s": _safe_float(getattr(dashboard, "poll_interval_s", 10.0), 10.0), + "stream_interval_s": _safe_float( + getattr(dashboard, "stream_interval_s", 3.0), + 3.0, + ), + "max_recent_activity_items": _safe_int( + getattr(dashboard, "max_recent_activity_items", 50), + 50, + ), + "card_sort_order": str(getattr(dashboard, "card_sort_order", "urgency") or "urgency"), + "show_fleet_activity_feed": bool( + getattr(dashboard, "show_fleet_activity_feed", True) + ), + "activity_feed_max_events": _safe_int( + getattr(dashboard, "activity_feed_max_events", 200), + 200, + ), + } + + +def _dashboard_disabled_reason() -> str: + try: + import feature_flags as ff + except Exception: + return "Feature flags unavailable" + + if not getattr(ff, "FEATURE_FEDERATION", False): + return "FEATURE_FEDERATION is disabled" + if not getattr(ff, "FEATURE_FEDERATION_DASHBOARD", False): + return "FEATURE_FEDERATION_DASHBOARD is disabled" + if not _dashboard_config_payload()["enabled"]: + return "Federation dashboard is disabled in config/federation.yaml" + return "" + + +def _empty_dashboard_snapshot(*, enabled: bool, disabled_reason: str = "") -> Dict[str, Any]: + return { + "enabled": enabled, + "disabled_reason": disabled_reason, + "generated_at": _utc_now_iso(), + "command_center_path": "/war-room/command", + "dashboard": _dashboard_config_payload(), + "fleet": { + "total_instances": 0, + "instances_needing_attention": 0, + "critical_instances": 0, + "lost_instances": 0, + "paused_instances": 0, + "pending_approvals": 0, + "trust_proposals": 0, + "active_agents": 0, + "fleet_cost_utilization_pct": 0.0, + "budget_threshold": "unknown", + "soul_consistency": "unknown", + }, + "instances": [], + "approvals": [], + "trust_proposals": [], + "activity": [], + "errors": [], + } + + +def _budget_threshold_for_pct(pct: float) -> str: + if pct >= 100.0: + return "hard_stop" + if pct >= 95.0: + return "spawn_gated" + if pct >= 85.0: + return "spawn_restricted" + if pct >= 75.0: + return "warning" + return "normal" + + +def _collect_cost_data() -> tuple[Dict[str, Dict[str, Any]], Dict[str, Any]]: + by_instance: Dict[str, Dict[str, Any]] = {} + aggregate: Dict[str, Any] = { + "utilization_pct": 0.0, + "threshold": "unknown", + "stale_instance_ids": [], + } + if not _cost_reporter: + return by_instance, aggregate + + try: + status = _cost_reporter.get_aggregate_status() + if isinstance(status, dict): + aggregate.update(status) + if status.get("instance_id"): + by_instance[str(status["instance_id"])] = status + except Exception as exc: + aggregate["error"] = str(exc) + return by_instance, aggregate + + aggregator = getattr(_cost_reporter, "_cost_aggregator", None) + if aggregator is not None: + try: + for item in aggregator.get_all_instances(): + payload = item.to_dict() if hasattr(item, "to_dict") else dict(item) + instance_id = str(payload.get("instance_id", "")) + if instance_id: + by_instance[instance_id] = payload + except Exception as exc: + aggregate["instance_error"] = str(exc) + + return by_instance, aggregate + + +def _collect_local_hive_summary() -> Dict[str, int]: + summary = {"active_agents": 0, "paused_agents": 0} + try: + from src.hive import api as hive_api + + registry = getattr(hive_api, "_registry", None) + if registry is None: + return summary + agents = registry.list_active() + for agent in agents: + state = getattr(getattr(agent, "state", ""), "value", getattr(agent, "state", "")) + state = str(state).lower() + if state == "paused": + summary["paused_agents"] += 1 + elif state in {"spawning", "ready", "executing", "waiting", "completing"}: + summary["active_agents"] += 1 + if not agents and hasattr(registry, "active_count"): + summary["active_agents"] = _safe_int(registry.active_count()) + except Exception as exc: + logger.debug("Failed to collect local HIVE dashboard summary: %s", exc) + return summary + + +def _approval_context_from_params(params: Any) -> str: + if not params: + return "" + try: + return json.dumps(params, sort_keys=True)[:240] + except Exception: + return str(params)[:240] + + +def _collect_local_approvals(instance_id: str, instance_name: str) -> list[Dict[str, Any]]: + approvals: list[Dict[str, Any]] = [] + try: + from src.core import governance_api + + sentry = getattr(governance_api, "_mcp_sentry", None) + if sentry is not None: + cleanup = getattr(sentry, "cleanup_expired", None) + if callable(cleanup): + cleanup() + for approval_id, req in getattr(sentry, "pending_requests", {}).items(): + if str(req.get("status", "")).upper() != "PENDING": + continue + capability = str(req.get("tool", "unknown")) + approvals.append({ + "id": approval_id, + "instance_id": instance_id, + "instance_name": instance_name, + "type": "sentry", + "action_name": capability, + "risk_tier": str(req.get("risk_tier") or "T3"), + "capability": capability, + "context": _approval_context_from_params(req.get("params", {})), + "created_at": req.get("timestamp", ""), + "waiting_since": req.get("timestamp", ""), + }) + + rule_engine = getattr(governance_api, "_rule_engine", None) + if rule_engine is not None: + for rule in rule_engine.list_rules(status="proposed"): + approvals.append({ + "id": rule.id, + "instance_id": instance_id, + "instance_name": instance_name, + "type": "apl_rule", + "action_name": getattr(rule, "name", "Approval learning rule"), + "risk_tier": "T2", + "capability": "approval_learning.rule", + "context": getattr(rule, "description", ""), + "created_at": getattr(rule, "created_at", ""), + "waiting_since": getattr(rule, "created_at", ""), + }) + except Exception as exc: + logger.debug("Failed to collect local approval dashboard data: %s", exc) + return approvals + + +def _collect_local_trust_proposals(instance_id: str, instance_name: str) -> list[Dict[str, Any]]: + proposals: list[Dict[str, Any]] = [] + try: + from src.core import trust_api, governance_api + + ledger = getattr(trust_api, "_trust_ledger", None) or getattr( + governance_api, + "_trust_ledger", + None, + ) + if ledger is None: + return proposals + for proposal in ledger.pending_proposals(): + proposals.append({ + "id": proposal.id, + "instance_id": instance_id, + "instance_name": instance_name, + "capability": proposal.capability, + "scope": proposal.scope, + "current_tier": int(proposal.current_tier), + "proposed_tier": int(proposal.proposed_tier), + "consecutive_successes": proposal.consecutive_successes, + "status": proposal.status, + "created_at": proposal.created_at, + }) + except Exception as exc: + logger.debug("Failed to collect local trust dashboard data: %s", exc) + return proposals + + +def _receipt_payload_value(receipt: Any, key: str) -> str: + for attr in ("metadata", "outputs", "inputs"): + payload = getattr(receipt, attr, {}) or {} + if isinstance(payload, dict) and payload.get(key): + return str(payload[key]) + return "" + + +def _receipt_activity_description(receipt: Any) -> str: + action_name = str(getattr(receipt, "action_name", "") or "").strip() + if action_name: + return action_name + for key in ("description", "message", "event", "phase", "capability"): + value = _receipt_payload_value(receipt, key) + if value: + return value + return str(getattr(receipt, "action_type", "") or "receipt") + + +def _collect_local_activity(instance_id: str, instance_name: str, limit: int) -> list[Dict[str, Any]]: + events: list[Dict[str, Any]] = [] + try: + from src.core.receipts_api import get_receipt_service_instance + + service = get_receipt_service_instance() + if service is None: + return events + receipts = service.list(limit=limit) + for receipt in receipts: + metadata = getattr(receipt, "metadata", {}) or {} + operator = ( + metadata.get("operator_id") + or metadata.get("operator") + or metadata.get("actor") + or "" + ) + events.append({ + "id": getattr(receipt, "id", ""), + "timestamp": getattr(receipt, "timestamp", ""), + "instance_id": instance_id, + "instance_name": instance_name, + "event_type": getattr(receipt, "action_type", ""), + "description": _receipt_activity_description(receipt), + "operator": operator, + "status": getattr(receipt, "status", ""), + }) + except Exception as exc: + logger.debug("Failed to collect local dashboard activity: %s", exc) + return events + + +def _collect_runtime_pause() -> Dict[str, Any]: + try: + from src.core.runtime_pause import get_runtime_pause_status + + status = get_runtime_pause_status() + return status if isinstance(status, dict) else {} + except Exception as exc: + logger.debug("Failed to collect runtime pause status: %s", exc) + return {} + + +def _collect_local_health_state() -> tuple[str, list[str]]: + try: + from health import api as health_api + + if getattr(health_api, "_snapshot_provider", None) is None: + return "healthy", [] + snapshot = health_api._get_snapshot() + reasons = [ + str(reason) + for reason in getattr(snapshot, "degraded_reasons", []) or [] + if str(reason) + ] + if getattr(snapshot, "ready", False): + return "healthy", [] + if not getattr(snapshot, "last_health_tick_at", None) and not reasons: + return "healthy", [] + return "degraded", reasons or ["System health degraded"] + except Exception as exc: + logger.debug("Failed to collect local health dashboard state: %s", exc) + return "healthy", [] + + +def _derive_attention_state(instance: Dict[str, Any]) -> tuple[str, list[str]]: + reasons = list(instance.get("attention_reasons", []) or []) + heartbeat_state = str(instance.get("heartbeat_state", "")).lower() + health = str(instance.get("health", "")).lower() + budget_threshold = str(instance.get("budget_threshold", "")).lower() + soul_matches_root = instance.get("soul_matches_root") + pending_approvals = _safe_int(instance.get("pending_approvals")) + trust_proposals = _safe_int(instance.get("trust_proposals")) + paused_agents = _safe_int(instance.get("paused_agents")) + runtime_errors = instance.get("runtime_errors", []) or [] + detail_status = str(instance.get("detail_status", "available")).lower() + paused = bool(instance.get("paused")) or paused_agents > 0 + + if heartbeat_state in {"critical", "lost"}: + reasons.append(f"Heartbeat {heartbeat_state}") + elif heartbeat_state == "warning": + reasons.append("Heartbeat warning") + + if health in {"degraded", "error"}: + reasons.append(f"Health {health}") + + if pending_approvals > 0: + reasons.append(f"{pending_approvals} pending approval(s)") + if trust_proposals > 0: + reasons.append(f"{trust_proposals} trust proposal(s)") + if budget_threshold in {"spawn_gated", "hard_stop", "blocked", "exceeded", "critical"}: + reasons.append(f"Budget {budget_threshold}") + elif budget_threshold in {"warning", "spawn_restricted", "restricted"}: + reasons.append(f"Budget {budget_threshold}") + if soul_matches_root is False: + reasons.append("Soul hash differs from root") + if detail_status != "available": + reasons.append("Remote detail unavailable") + if runtime_errors: + reasons.append("Runtime errors present") + if paused: + reasons.append("Runtime paused") + + deduped = list(dict.fromkeys(reason for reason in reasons if reason)) + + if paused: + state = "paused" + elif ( + heartbeat_state in {"critical", "lost"} + or health == "error" + or budget_threshold in {"spawn_gated", "hard_stop", "blocked", "exceeded", "critical"} + or runtime_errors + ): + state = "critical" + elif deduped: + state = "attention" + else: + state = "healthy" + return state, deduped + + +def _build_local_instance_snapshot( + runtime_status: Dict[str, Any], + cost_by_instance: Dict[str, Dict[str, Any]], + aggregate_cost: Dict[str, Any], +) -> tuple[Dict[str, Any], list[Dict[str, Any]], list[Dict[str, Any]], list[Dict[str, Any]]]: + instance_id = _identity.instance_id if _identity else "" + instance_name = "Local Lancelot" + if _config and getattr(_config, "self_address", ""): + instance_name = _display_name_from_address(_config.self_address) or instance_name + + latest_hb = _heartbeat_emitter.get_latest() if _heartbeat_emitter else None + heartbeat_age = _elapsed_seconds(latest_hb.timestamp if latest_hb else None) + heartbeat_state = "fresh" if latest_hb else "lost" + soul_hash = ( + runtime_status.get("local_soul_hash") + or (latest_hb.soul_version_hash if latest_hb else "") + or "" + ) + + hive = _collect_local_hive_summary() + approvals = _collect_local_approvals(instance_id, instance_name) + trust_proposals = _collect_local_trust_proposals(instance_id, instance_name) + activity_limit = _dashboard_config_payload()["max_recent_activity_items"] + activity = _collect_local_activity(instance_id, instance_name, activity_limit) + recent = activity[0] if activity else None + pause_status = _collect_runtime_pause() + paused = bool(pause_status.get("paused", False)) + health, health_reasons = _collect_local_health_state() + attention_reasons = list(health_reasons) + attention_reasons.extend(_dashboard_runtime_attention_reasons(runtime_status)) + + cost = cost_by_instance.get(instance_id, {}) + budget_pct = _safe_float( + cost.get("utilization_pct") + or (latest_hb.budget_utilization_pct if latest_hb else 0.0) + ) + budget_threshold = ( + str(cost.get("threshold_level") or cost.get("threshold") or "") + or ( + str(aggregate_cost.get("threshold")) + if aggregate_cost.get("instance_count", 1) <= 1 + else "" + ) + or _budget_threshold_for_pct(budget_pct) + ) + + if paused: + health = "paused" + + instance = { + "instance_id": instance_id, + "instance_short_id": _short_id(instance_id), + "name": instance_name, + "role": "self", + "address": getattr(_config, "self_address", "") if _config else "", + "command_center_url": "/war-room/command", + "is_self": True, + "state": "healthy", + "health": health, + "heartbeat_state": heartbeat_state, + "heartbeat_age_s": heartbeat_age, + "last_heartbeat_at": latest_hb.timestamp if latest_hb else None, + "soul_version_hash": soul_hash, + "soul_matches_root": True if soul_hash else None, + "budget_utilization_pct": round(budget_pct, 1), + "budget_threshold": budget_threshold, + "active_agents": hive["active_agents"], + "paused_agents": hive["paused_agents"], + "pending_approvals": len(approvals), + "trust_proposals": len(trust_proposals), + "recent_activity": recent.get("description") if recent else "", + "recent_activity_at": recent.get("timestamp") if recent else None, + "attention_reasons": attention_reasons, + "runtime_errors": list(runtime_status.get("runtime_errors", []) or []), + "detail_status": "available", + "paused": paused, + "pause_reason": pause_status.get("reason"), + } + instance["state"], instance["attention_reasons"] = _derive_attention_state(instance) + return instance, approvals, trust_proposals, activity + + +def _peer_instance_base( + peer: Any, + *, + root_soul_hash: str, + cost_by_instance: Dict[str, Dict[str, Any]], + detail_error: str = "", +) -> Dict[str, Any]: + from src.federation.heartbeat import compute_staleness + + peer_id = str(getattr(peer, "instance_id", "")) + address = str(getattr(peer, "address", "")) + metadata = getattr(peer, "metadata", {}) or {} + instance_name = ( + str(metadata.get("instance_name", "")).strip() + or _display_name_from_address(address) + or f"Instance {_short_id(peer_id)}" + ) + last_heartbeat = getattr(peer, "last_heartbeat_at", None) + heartbeat_state, heartbeat_age = compute_staleness( + last_heartbeat, + warning_s=getattr(_config, "staleness_warning_s", 10.0), + critical_s=getattr(_config, "staleness_critical_s", 20.0), + lost_s=getattr(_config, "staleness_lost_s", 30.0), + ) + cost = cost_by_instance.get(peer_id, {}) + budget_pct = _safe_float(cost.get("utilization_pct")) + budget_threshold = str( + cost.get("threshold_level") + or cost.get("threshold") + or _budget_threshold_for_pct(budget_pct) + ) + soul_hash = str(getattr(peer, "soul_version_hash", "") or "") + soul_matches_root = None + if root_soul_hash and soul_hash: + soul_matches_root = soul_hash == root_soul_hash + + instance = { + "instance_id": peer_id, + "instance_short_id": _short_id(peer_id), + "name": instance_name, + "role": str(getattr(peer, "role", "peer") or "peer"), + "address": address, + "command_center_url": _command_center_url(address), + "is_self": False, + "state": "healthy", + "health": "unknown" if detail_error else "healthy", + "heartbeat_state": heartbeat_state, + "heartbeat_age_s": heartbeat_age if heartbeat_age >= 0 else None, + "last_heartbeat_at": last_heartbeat, + "soul_version_hash": soul_hash, + "soul_matches_root": soul_matches_root, + "budget_utilization_pct": round(budget_pct, 1), + "budget_threshold": budget_threshold, + "active_agents": 0, + "paused_agents": 0, + "pending_approvals": 0, + "trust_proposals": 0, + "recent_activity": "", + "recent_activity_at": None, + "attention_reasons": [detail_error] if detail_error else [], + "runtime_errors": [], + "detail_status": "unavailable" if detail_error else "topology_only", + "paused": False, + "pause_reason": "", + } + instance["state"], instance["attention_reasons"] = _derive_attention_state(instance) + return instance + + +def _merge_peer_detail(base: Dict[str, Any], detail: Dict[str, Any]) -> Dict[str, Any]: + merged = dict(base) + passthrough_fields = { + "health", + "active_agents", + "paused_agents", + "pending_approvals", + "trust_proposals", + "recent_activity", + "recent_activity_at", + "runtime_errors", + "paused", + "pause_reason", + } + for field in passthrough_fields: + if field in detail: + merged[field] = detail[field] + + if detail.get("name"): + merged["name"] = detail["name"] + if detail.get("soul_version_hash") and not merged.get("soul_version_hash"): + merged["soul_version_hash"] = detail["soul_version_hash"] + if detail.get("budget_utilization_pct"): + merged["budget_utilization_pct"] = detail["budget_utilization_pct"] + if detail.get("budget_threshold"): + merged["budget_threshold"] = detail["budget_threshold"] + + detail_reasons = list(detail.get("attention_reasons", []) or []) + merged["attention_reasons"] = list( + dict.fromkeys(list(merged.get("attention_reasons", []) or []) + detail_reasons) + ) + merged["detail_status"] = "available" + merged["state"], merged["attention_reasons"] = _derive_attention_state(merged) + return merged + + +def _normalize_remote_rows( + rows: Any, + *, + instance_id: str, + instance_name: str, +) -> list[Dict[str, Any]]: + normalized: list[Dict[str, Any]] = [] + if not isinstance(rows, list): + return normalized + for row in rows: + if not isinstance(row, dict): + continue + item = dict(row) + item["instance_id"] = item.get("instance_id") or instance_id + item["instance_name"] = item.get("instance_name") or instance_name + normalized.append(item) + return normalized + + +def _sort_dashboard_instances(instances: list[Dict[str, Any]]) -> list[Dict[str, Any]]: + sort_order = _dashboard_config_payload()["card_sort_order"] + if sort_order == "alphabetical": + return sorted(instances, key=lambda item: str(item.get("name", "")).lower()) + if sort_order == "role": + return sorted(instances, key=lambda item: (str(item.get("role", "")), str(item.get("name", "")))) + + severity = {"critical": 0, "attention": 1, "paused": 2, "healthy": 3} + return sorted( + instances, + key=lambda item: ( + severity.get(str(item.get("state", "healthy")), 4), + -_safe_int(item.get("pending_approvals")), + -_safe_int(item.get("trust_proposals")), + str(item.get("name", "")).lower(), + ), + ) + + +def _build_fleet_summary( + instances: list[Dict[str, Any]], + *, + aggregate_cost: Dict[str, Any], + runtime_status: Dict[str, Any], +) -> Dict[str, Any]: + return { + "total_instances": len(instances), + "instances_needing_attention": sum( + 1 for item in instances if item.get("state") in {"attention", "critical", "paused"} + ), + "critical_instances": sum(1 for item in instances if item.get("state") == "critical"), + "lost_instances": sum(1 for item in instances if item.get("heartbeat_state") == "lost"), + "paused_instances": sum(1 for item in instances if item.get("state") == "paused"), + "pending_approvals": sum(_safe_int(item.get("pending_approvals")) for item in instances), + "trust_proposals": sum(_safe_int(item.get("trust_proposals")) for item in instances), + "active_agents": sum(_safe_int(item.get("active_agents")) for item in instances), + "fleet_cost_utilization_pct": round(_safe_float(aggregate_cost.get("utilization_pct")), 1), + "budget_threshold": str(aggregate_cost.get("threshold", "unknown") or "unknown"), + "soul_consistency": str(runtime_status.get("soul_consistency", "unknown") or "unknown"), + } + + +async def _fetch_peer_dashboard_local(peer: Any) -> tuple[Optional[Dict[str, Any]], str]: + if _transport is None or not callable(getattr(_transport, "send", None)): + return None, "Federation transport not available for remote dashboard detail" + address = str(getattr(peer, "address", "") or "") + if not address: + return None, "Peer address unavailable" + timeout = min(_safe_float(getattr(_config, "command_timeout_s", 5.0), 5.0), 5.0) + result = await _transport.send( + peer_address=address, + method="GET", + path="/api/federation/dashboard/local", + peer_id=str(getattr(peer, "instance_id", "")), + timeout_override_s=timeout, + ) + if not getattr(result, "success", False): + status_code = getattr(result, "status_code", None) + error = getattr(result, "error", "") or f"HTTP {status_code}" + return None, f"Remote dashboard detail unavailable: {error}" + body = getattr(result, "body", None) + if not isinstance(body, dict): + return None, "Remote dashboard detail returned an invalid payload" + if body.get("enabled") is False: + return None, body.get("disabled_reason") or "Remote dashboard disabled" + return body, "" + + +async def _build_dashboard_snapshot(*, include_remote: bool) -> Dict[str, Any]: + disabled_reason = _dashboard_disabled_reason() + if disabled_reason: + return _empty_dashboard_snapshot(enabled=False, disabled_reason=disabled_reason) + + runtime_status = _build_runtime_status() + cost_by_instance, aggregate_cost = _collect_cost_data() + local_instance, approvals, trust_proposals, activity = _build_local_instance_snapshot( + runtime_status, + cost_by_instance, + aggregate_cost, + ) + + instances = [local_instance] + errors: list[Dict[str, Any]] = [] + + peers = _topology_registry.list_peers() if _topology_registry else [] + root_soul_hash = str(runtime_status.get("local_soul_hash", "") or "") + + if include_remote and peers: + remote_results = await asyncio.gather( + *[_fetch_peer_dashboard_local(peer) for peer in peers], + return_exceptions=True, + ) + for peer, result in zip(peers, remote_results): + detail: Optional[Dict[str, Any]] = None + detail_error = "" + if isinstance(result, Exception): + detail_error = f"Remote dashboard detail failed: {result}" + else: + detail, detail_error = result + + base = _peer_instance_base( + peer, + root_soul_hash=root_soul_hash, + cost_by_instance=cost_by_instance, + detail_error=detail_error, + ) + + if detail: + remote_instances = detail.get("instances", []) + remote_instance = remote_instances[0] if remote_instances else {} + if isinstance(remote_instance, dict): + base = _merge_peer_detail(base, remote_instance) + instance_id = str(base.get("instance_id", "")) + instance_name = str(base.get("name", "")) + approvals.extend( + _normalize_remote_rows( + detail.get("approvals", []), + instance_id=instance_id, + instance_name=instance_name, + ) + ) + trust_proposals.extend( + _normalize_remote_rows( + detail.get("trust_proposals", []), + instance_id=instance_id, + instance_name=instance_name, + ) + ) + activity.extend( + _normalize_remote_rows( + detail.get("activity", []), + instance_id=instance_id, + instance_name=instance_name, + ) + ) + elif detail_error: + errors.append({ + "instance_id": getattr(peer, "instance_id", ""), + "message": detail_error, + }) + instances.append(base) + elif include_remote: + for peer in peers: + instances.append( + _peer_instance_base( + peer, + root_soul_hash=root_soul_hash, + cost_by_instance=cost_by_instance, + ) + ) + + instances = _sort_dashboard_instances(instances) + approvals.sort( + key=lambda item: ( + 0 if str(item.get("risk_tier", "")).upper() == "T3" else 1, + str(item.get("created_at") or item.get("waiting_since") or ""), + ) + ) + trust_proposals.sort(key=lambda item: str(item.get("created_at", ""))) + activity.sort(key=lambda item: str(item.get("timestamp", "")), reverse=True) + if not _dashboard_config_payload()["show_fleet_activity_feed"]: + activity = [] + else: + activity = activity[:_dashboard_config_payload()["activity_feed_max_events"]] + + return { + "enabled": True, + "disabled_reason": "", + "generated_at": _utc_now_iso(), + "command_center_path": "/war-room/command", + "dashboard": _dashboard_config_payload(), + "fleet": _build_fleet_summary( + instances, + aggregate_cost=aggregate_cost, + runtime_status=runtime_status, + ), + "instances": instances, + "approvals": approvals, + "trust_proposals": trust_proposals, + "activity": activity, + "errors": errors, + } + + +def _require_dashboard_enabled_for_action() -> None: + disabled_reason = _dashboard_disabled_reason() + if disabled_reason: + raise HTTPException(status_code=403, detail=disabled_reason) + + +def _clean_decision_reason(reason: str) -> str: + cleaned = str(reason or "").strip() + if not cleaned: + raise HTTPException(status_code=400, detail="A decision reason is required") + return cleaned + + +def _dashboard_operator_identity_from_request(request: Request): + from src.core.auth_api import resolve_authenticated_identity + + identity = resolve_authenticated_identity(request) + if identity is None or not getattr(identity, "is_valid", False): + raise HTTPException(status_code=401, detail="Operator identity is required") + return identity + + +def _operator_identity_from_payload(payload: Dict[str, Any]): + from src.core.operator_identity import OperatorIdentity + + try: + identity = OperatorIdentity.from_dict(payload or {}) + except Exception as exc: + raise HTTPException(status_code=422, detail="Invalid operator_identity payload") from exc + if not identity.is_valid: + raise HTTPException(status_code=401, detail="Valid operator_identity is required") + return identity + + +def _require_dashboard_operator_decision_capabilities(request: Request) -> None: + if getattr(request.state, "federation_auth_mode", "") == "root_peer": + return + + from src.core.auth_api import request_has_capability + + missing = [ + capability + for capability in ("federation.admin", "governance.admin") + if not request_has_capability(request, capability) + ] + if missing: + raise HTTPException(status_code=403, detail=f"Missing capability: {missing[0]}") + + +def _apply_local_dashboard_decision( + approval_id: str, + *, + decision: str, + reason: str, + identity: Any, +) -> Dict[str, Any]: + from src.core import governance_api + + normalized = str(decision or "").lower() + if normalized == "approve": + result = governance_api._approve_item_direct( + approval_id, + reason=reason, + identity=identity, + ) + elif normalized == "deny": + result = governance_api._deny_item_direct( + approval_id, + reason=reason, + identity=identity, + ) + else: + raise HTTPException(status_code=400, detail=f"Unsupported decision: {decision}") + + if result is None: + raise HTTPException(status_code=404, detail=f"Approval item {approval_id} not found") + if not isinstance(result, dict): + return {"status": normalized, "id": approval_id, "result": result} + return result + + +def _receipt_action_type_for_dashboard_decision(result: Dict[str, Any], decision: str): + from src.shared.receipts import ActionType + + result_type = str(result.get("type", "")).lower() + approved = decision == "approve" + if result_type == "sentry": + return ActionType.MCP_T3_APPROVED if approved else ActionType.MCP_T3_REJECTED + if result_type == "apl_rule": + return ActionType.APL_RULE_APPROVED if approved else ActionType.APL_RULE_REJECTED + return ActionType.T3_APPROVED if approved else ActionType.T3_REJECTED + + +def _emit_dashboard_proxy_receipt( + *, + identity: Any, + decision: str, + target_instance_id: str, + approval_id: str, + result: Dict[str, Any], +) -> None: + from src.core.governance_receipts import emit_governance_receipt_for_identity + + action_type = _receipt_action_type_for_dashboard_decision(result, decision) + source_instance_id = _identity.instance_id if _identity else "" + emit_governance_receipt_for_identity( + identity, + action_type, + action_name=f"federation_approval_proxy_{decision}", + inputs={ + "approval_id": approval_id, + "target_instance_id": target_instance_id, + "source_instance_id": source_instance_id, + "decision": decision, + "result_type": result.get("type", ""), + }, + outputs={"remote_result": result}, + metadata={ + "federated_proxy": True, + "target_instance_id": target_instance_id, + "source_instance_id": source_instance_id, + "operator_id": getattr(identity, "operator_id", ""), + }, + ) + + +def _find_dashboard_peer(instance_id: str): + if not _topology_registry: + raise HTTPException(status_code=404, detail="Federation topology is unavailable") + peer = _topology_registry.get_peer(instance_id) + if peer is None: + raise HTTPException(status_code=404, detail=f"Federation instance {instance_id} not found") + return peer + + +async def _send_dashboard_decision_to_peer( + peer: Any, + *, + approval_id: str, + decision: str, + reason: str, + identity: Any, +) -> Dict[str, Any]: + if _transport is None or not callable(getattr(_transport, "send", None)): + raise HTTPException(status_code=503, detail="Federation transport not available") + + address = str(getattr(peer, "address", "") or "") + if not address: + raise HTTPException(status_code=503, detail="Federation peer address unavailable") + + peer_id = str(getattr(peer, "instance_id", "") or "") + timeout = min(_safe_float(getattr(_config, "command_timeout_s", 5.0), 5.0), 10.0) + result = await _transport.send( + peer_address=address, + method="POST", + path=( + "/api/federation/dashboard/local/approvals/" + f"{quote(approval_id, safe='')}/{decision}" + ), + body={ + "reason": reason, + "operator_identity": identity.to_dict(), + "source_instance_id": _identity.instance_id if _identity else "", + }, + peer_id=peer_id, + timeout_override_s=timeout, + ) + + body = getattr(result, "body", None) + if not getattr(result, "success", False): + status_code = getattr(result, "status_code", 0) or 502 + if status_code < 400 or status_code > 599: + status_code = 502 + detail = "" + if isinstance(body, dict): + detail = str(body.get("detail") or body.get("error") or "") + detail = detail or getattr(result, "error", "") or f"HTTP {status_code}" + raise HTTPException(status_code=status_code, detail=detail) + + if not isinstance(body, dict): + raise HTTPException(status_code=502, detail="Remote approval decision returned invalid payload") + return body + + +async def _handle_federation_dashboard_decision( + request: Request, + *, + instance_id: str, + approval_id: str, + decision: str, + reason: str, +) -> JSONResponse: + if not _initialized: + return _not_initialized() + _require_dashboard_enabled_for_action() + + normalized_decision = str(decision or "").lower() + clean_reason = _clean_decision_reason(reason) + identity = _dashboard_operator_identity_from_request(request) + local_instance_id = _identity.instance_id if _identity else "" + + if instance_id == local_instance_id: + result = _apply_local_dashboard_decision( + approval_id, + decision=normalized_decision, + reason=clean_reason, + identity=identity, + ) + return JSONResponse( + status_code=200, + content={ + "success": True, + "decision": normalized_decision, + "instance_id": instance_id, + "approval_id": approval_id, + "result": result, + }, + ) + + peer = _find_dashboard_peer(instance_id) + remote_body = await _send_dashboard_decision_to_peer( + peer, + approval_id=approval_id, + decision=normalized_decision, + reason=clean_reason, + identity=identity, + ) + if remote_body.get("success") is False: + raise HTTPException( + status_code=502, + detail=remote_body.get("error") or remote_body.get("detail") or "Remote decision failed", + ) + + remote_result = remote_body.get("result") + if not isinstance(remote_result, dict): + remote_result = {"result": remote_result} + _emit_dashboard_proxy_receipt( + identity=identity, + decision=normalized_decision, + target_instance_id=instance_id, + approval_id=approval_id, + result=remote_result, + ) + return JSONResponse( + status_code=200, + content={ + "success": True, + "decision": normalized_decision, + "instance_id": instance_id, + "approval_id": approval_id, + "result": remote_result, + "remote": remote_body, + }, + ) + + +def _handle_local_dashboard_decision( + request: Request, + *, + approval_id: str, + decision: str, + body: FederatedDashboardDecisionRequest, +) -> JSONResponse: + if not _initialized: + return _not_initialized() + _require_dashboard_enabled_for_action() + _require_dashboard_operator_decision_capabilities(request) + + identity = ( + _operator_identity_from_payload(body.operator_identity) + if body.operator_identity + else _dashboard_operator_identity_from_request(request) + ) + clean_reason = _clean_decision_reason(body.reason) + normalized_decision = str(decision or "").lower() + result = _apply_local_dashboard_decision( + approval_id, + decision=normalized_decision, + reason=clean_reason, + identity=identity, + ) + return JSONResponse( + status_code=200, + content={ + "success": True, + "decision": normalized_decision, + "instance_id": _identity.instance_id if _identity else "", + "approval_id": approval_id, + "source_instance_id": body.source_instance_id, + "result": result, + }, + ) + + def _not_initialized() -> JSONResponse: return JSONResponse( status_code=503, @@ -544,6 +1776,34 @@ async def _require_operator_or_valid_peer_request(request: Request) -> None: await _require_valid_peer_request(request) +async def _require_operator_or_root_peer_request(request: Request) -> None: + """Allow either a local operator or a signed ROOT federation peer request.""" + try: + require_authenticated_request(request) + request.state.federation_auth_mode = "operator" + return + except HTTPException as exc: + if exc.status_code not in {401, 503}: + raise + + if not _auth: + raise HTTPException(status_code=401, detail="Federation authentication is required") + + await _require_valid_peer_request(request) + peer_id = str(getattr(request.state, "federation_peer_instance_id", "") or "") + if not peer_id: + raise HTTPException(status_code=401, detail="Signed federation peer identity is required") + + peer = _topology_registry.get_peer(peer_id) if _topology_registry else None + role = str(getattr(peer, "role", "") or "").lower() if peer else "" + if role != "root": + raise HTTPException( + status_code=403, + detail="Dashboard detail and decisions require ROOT peer authority", + ) + request.state.federation_auth_mode = "root_peer" + + # ═══════════════════════════════════════════════════════════════ # Category 1: State Stream # ═══════════════════════════════════════════════════════════════ @@ -974,6 +2234,141 @@ async def get_federation_health( }) +@router.get("/dashboard") +async def get_federation_dashboard( + _authn: None = Depends(require_authenticated_request), +): + """Return the operator fleet dashboard snapshot for this control plane.""" + if not _initialized: + return _not_initialized() + + snapshot = await _build_dashboard_snapshot(include_remote=True) + return JSONResponse(status_code=200, content=snapshot) + + +@router.get("/dashboard/stream") +async def stream_federation_dashboard( + _authn: None = Depends(require_authenticated_request), +): + """Stream live fleet dashboard snapshots for the operator control plane.""" + if not _initialized: + return _not_initialized() + + async def event_generator(): + while True: + try: + snapshot = await _build_dashboard_snapshot(include_remote=True) + yield f"event: snapshot\ndata: {json.dumps(snapshot)}\n\n" + interval = _safe_float( + snapshot.get("dashboard", {}).get("stream_interval_s"), + 3.0, + ) + await asyncio.sleep(max(1.0, min(interval, 30.0))) + except asyncio.CancelledError: + logger.debug("Federation dashboard stream cancelled") + raise + except Exception as exc: + logger.warning("Federation dashboard stream snapshot failed: %s", exc) + payload = {"error": str(exc), "generated_at": _utc_now_iso()} + yield f"event: dashboard_error\ndata: {json.dumps(payload)}\n\n" + await asyncio.sleep(3.0) + + return StreamingResponse( + event_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }, + ) + + +@router.get("/dashboard/local") +async def get_local_federation_dashboard( + request: Request, + _authn_or_peer: None = Depends(_require_operator_or_root_peer_request), +): + """Return this instance's local dashboard detail for an operator or ROOT peer.""" + if not _initialized: + return _not_initialized() + + snapshot = await _build_dashboard_snapshot(include_remote=False) + return JSONResponse(status_code=200, content=snapshot) + + +@router.post("/dashboard/instances/{instance_id}/approvals/{approval_id}/approve") +async def approve_federation_dashboard_approval( + request: Request, + instance_id: str, + approval_id: str, + body: DashboardDecisionRequest, + _authn: None = Depends(require_authenticated_request), + _federation_capability: None = Depends(require_operator_capability("federation.admin")), + _governance_capability: None = Depends(require_operator_capability("governance.admin")), +): + """Approve a pending governance item on any federated dashboard instance.""" + return await _handle_federation_dashboard_decision( + request, + instance_id=instance_id, + approval_id=approval_id, + decision="approve", + reason=body.reason, + ) + + +@router.post("/dashboard/instances/{instance_id}/approvals/{approval_id}/deny") +async def deny_federation_dashboard_approval( + request: Request, + instance_id: str, + approval_id: str, + body: DashboardDecisionRequest, + _authn: None = Depends(require_authenticated_request), + _federation_capability: None = Depends(require_operator_capability("federation.admin")), + _governance_capability: None = Depends(require_operator_capability("governance.admin")), +): + """Deny a pending governance item on any federated dashboard instance.""" + return await _handle_federation_dashboard_decision( + request, + instance_id=instance_id, + approval_id=approval_id, + decision="deny", + reason=body.reason, + ) + + +@router.post("/dashboard/local/approvals/{approval_id}/approve") +async def approve_local_dashboard_approval( + request: Request, + approval_id: str, + body: FederatedDashboardDecisionRequest, + _authn_or_peer: None = Depends(_require_operator_or_root_peer_request), +): + """Approve a local pending governance item for a federated ROOT dashboard.""" + return _handle_local_dashboard_decision( + request, + approval_id=approval_id, + decision="approve", + body=body, + ) + + +@router.post("/dashboard/local/approvals/{approval_id}/deny") +async def deny_local_dashboard_approval( + request: Request, + approval_id: str, + body: FederatedDashboardDecisionRequest, + _authn_or_peer: None = Depends(_require_operator_or_root_peer_request), +): + """Deny a local pending governance item for a federated ROOT dashboard.""" + return _handle_local_dashboard_decision( + request, + approval_id=approval_id, + decision="deny", + body=body, + ) + + @router.get("/peers") async def get_peers( _authn: None = Depends(require_authenticated_request), diff --git a/src/federation/config.py b/src/federation/config.py index f4286e4..a45b943 100644 --- a/src/federation/config.py +++ b/src/federation/config.py @@ -30,6 +30,18 @@ class DeploymentMode(str, Enum): FEDERATED = "federated" +class FederationDashboardConfig(BaseModel): + """Configuration for the operator-facing fleet dashboard.""" + + enabled: bool = Field(default=True) + poll_interval_s: float = Field(default=10.0, ge=2.0, le=120.0) + stream_interval_s: float = Field(default=3.0, ge=1.0, le=30.0) + max_recent_activity_items: int = Field(default=50, ge=1, le=500) + card_sort_order: str = Field(default="urgency") + show_fleet_activity_feed: bool = Field(default=True) + activity_feed_max_events: int = Field(default=200, ge=10, le=1000) + + class FederationConfig(BaseModel): """Configuration for the Federation subsystem.""" @@ -80,6 +92,9 @@ class FederationConfig(BaseModel): # This instance's externally-reachable address (for peer registration) self_address: str = Field(default="") + # Operator fleet dashboard + dashboard: FederationDashboardConfig = Field(default_factory=FederationDashboardConfig) + def get_federation_config_path(config_dir: Optional[str] = None) -> Path: """Return the canonical federation config path.""" diff --git a/src/warroom/src/App.tsx b/src/warroom/src/App.tsx index 05364df..5faaa10 100644 --- a/src/warroom/src/App.tsx +++ b/src/warroom/src/App.tsx @@ -24,6 +24,7 @@ const SkillsPanelPage = lazy(() => import('@/pages/SkillsPanel').then(module => const HiveAgentMeshPage = lazy(() => import('@/pages/HiveAgentMesh').then(module => ({ default: module.HiveAgentMesh }))) const LoginPageRoute = lazy(() => import('@/pages/LoginPage').then(module => ({ default: module.LoginPage }))) const LoginCallbackPageRoute = lazy(() => import('@/pages/LoginCallbackPage').then(module => ({ default: module.LoginCallbackPage }))) +const FleetDashboardPage = lazy(() => import('@/pages/FleetDashboard').then(module => ({ default: module.FleetDashboard }))) const FederationOverviewPage = lazy(() => import('@/pages/FederationOverview').then(module => ({ default: module.FederationOverview }))) const GraphBuilderPage = lazy(() => import('@/pages/GraphBuilder').then(module => ({ default: module.GraphBuilder }))) const FederationAuditPage = lazy(() => import('@/pages/FederationAudit').then(module => ({ default: module.FederationAudit }))) @@ -74,6 +75,7 @@ function App() { {/* FEDERATION */} + diff --git a/src/warroom/src/api/actioncards.ts b/src/warroom/src/api/actioncards.ts index 131b6d9..1facfae 100644 --- a/src/warroom/src/api/actioncards.ts +++ b/src/warroom/src/api/actioncards.ts @@ -1,9 +1,60 @@ import { apiGet, apiPost } from './client' -import type { ActionCardsPendingResponse, ActionCardResolveResponse } from '@/types/api' +import type { + ActionCardButton, + ActionCardData, + ActionCardResolveResponse, + ActionCardsPendingResponse, + ActionCardType, +} from '@/types/api' + +interface RawActionCardData { + card_id?: string + cardId?: string + card_type?: ActionCardType + cardType?: ActionCardType + quest_id?: string | null + questId?: string | null + source_system?: string + sourceSystem?: string + source_item_id?: string + sourceItemId?: string + title?: string + description?: string + buttons?: ActionCardButton[] + resolved?: boolean + resolved_action?: string + resolvedAction?: string + resolved_channel?: string + resolvedChannel?: string + created_at?: number + presentedAt?: number + resolved_at?: number + resolvedAt?: number +} + +function normalizeActionCard(card: RawActionCardData): ActionCardData { + return { + cardId: card.cardId ?? card.card_id ?? '', + cardType: card.cardType ?? card.card_type ?? 'info', + questId: card.questId ?? card.quest_id ?? null, + sourceSystem: card.sourceSystem ?? card.source_system, + sourceItemId: card.sourceItemId ?? card.source_item_id, + title: card.title ?? '', + description: card.description ?? '', + buttons: card.buttons ?? [], + resolved: card.resolved ?? false, + resolvedAction: card.resolvedAction ?? card.resolved_action, + resolvedChannel: card.resolvedChannel ?? card.resolved_channel, + presentedAt: card.presentedAt ?? card.created_at ?? Date.now() / 1000, + resolvedAt: card.resolvedAt ?? card.resolved_at, + } +} /** GET /api/actioncards/pending — Fetch all pending action cards */ -export function fetchPendingActionCards() { - return apiGet('/api/actioncards/?status=pending') +export async function fetchPendingActionCards(): Promise { + const response = await apiGet<{ cards: RawActionCardData[]; count: number }>('/api/actioncards/?status=pending') + const cards = response.cards.map(normalizeActionCard) + return { cards, count: response.count ?? cards.length } } /** POST /api/actioncards/:cardId/resolve — Resolve an action card */ diff --git a/src/warroom/src/api/client.ts b/src/warroom/src/api/client.ts index e5f7bdc..b90f9d7 100644 --- a/src/warroom/src/api/client.ts +++ b/src/warroom/src/api/client.ts @@ -54,7 +54,11 @@ async function handleResponse(res: Response): Promise { if (!res.ok) { // Session expired — redirect to login if (res.status === 401) { - window.location.href = '/war-room/login' + const currentPath = `${window.location.pathname}${window.location.search}${window.location.hash}` + const returnTo = currentPath.startsWith('/war-room') + ? currentPath.slice('/war-room'.length) || '/command' + : currentPath + window.location.href = `/war-room/login?return_to=${encodeURIComponent(returnTo)}` throw new ApiClientError(401, { error: 'Session expired', status: 401 }) } let body: ApiError diff --git a/src/warroom/src/api/federation.ts b/src/warroom/src/api/federation.ts index be333c8..669b656 100644 --- a/src/warroom/src/api/federation.ts +++ b/src/warroom/src/api/federation.ts @@ -79,6 +79,127 @@ export interface FederationHealthSummary { active_propagation_count?: number } +export type FleetInstanceState = 'healthy' | 'attention' | 'critical' | 'paused' + +export interface FleetDashboardConfig { + enabled: boolean + poll_interval_s: number + stream_interval_s: number + max_recent_activity_items: number + card_sort_order: string + show_fleet_activity_feed: boolean + activity_feed_max_events: number +} + +export interface FleetSummary { + total_instances: number + instances_needing_attention: number + critical_instances: number + lost_instances: number + paused_instances: number + pending_approvals: number + trust_proposals: number + active_agents: number + fleet_cost_utilization_pct: number + budget_threshold: string + soul_consistency: string +} + +export interface FleetDashboardInstance { + instance_id: string + instance_short_id: string + name: string + role: string + address: string + command_center_url: string + is_self: boolean + state: FleetInstanceState + health: string + heartbeat_state: string + heartbeat_age_s: number | null + last_heartbeat_at: string | null + soul_version_hash: string + soul_matches_root: boolean | null + budget_utilization_pct: number + budget_threshold: string + active_agents: number + paused_agents: number + pending_approvals: number + trust_proposals: number + recent_activity: string + recent_activity_at: string | null + attention_reasons: string[] + runtime_errors: string[] + detail_status: string + paused: boolean + pause_reason: string | null +} + +export interface FleetApproval { + id: string + instance_id: string + instance_name: string + type: string + action_name: string + risk_tier: string + capability: string + context: string + created_at: string + waiting_since: string +} + +export interface FleetTrustProposal { + id: string + instance_id: string + instance_name: string + capability: string + scope: string + current_tier: number + proposed_tier: number + consecutive_successes: number + status: string + created_at: string +} + +export interface FleetActivityEvent { + id: string + timestamp: string + instance_id: string + instance_name: string + event_type: string + description: string + operator: string + status: string +} + +export interface FleetDashboardError { + instance_id: string + message: string +} + +export interface FleetDashboardSnapshot { + enabled: boolean + disabled_reason: string + generated_at: string + command_center_path: string + dashboard: FleetDashboardConfig + fleet: FleetSummary + instances: FleetDashboardInstance[] + approvals: FleetApproval[] + trust_proposals: FleetTrustProposal[] + activity: FleetActivityEvent[] + errors: FleetDashboardError[] +} + +export interface FleetDecisionResponse { + success: boolean + decision: 'approve' | 'deny' + instance_id: string + approval_id: string + result: Record + remote?: Record +} + // Graph Builder types export interface TopologyNode { node_id: string @@ -322,6 +443,69 @@ export async function fetchFederationHealth(): Promise return apiGet('/api/federation/health') } +export async function fetchFederationDashboard(): Promise { + return apiGet('/api/federation/dashboard') +} + +function encodePathSegment(value: string): string { + return encodeURIComponent(value) +} + +export function subscribeFederationDashboard( + onSnapshot: (snapshot: FleetDashboardSnapshot) => void, + onError?: (error: Error) => void, + onConnectionChange?: (connected: boolean) => void, +): () => void { + const source = new EventSource('/api/federation/dashboard/stream', { withCredentials: true }) + + source.onopen = () => { + onConnectionChange?.(true) + } + source.onerror = () => { + onConnectionChange?.(false) + onError?.(new Error('Fleet dashboard stream disconnected')) + } + source.addEventListener('snapshot', (event) => { + try { + onSnapshot(JSON.parse((event as MessageEvent).data) as FleetDashboardSnapshot) + } catch (error) { + onError?.(error instanceof Error ? error : new Error(String(error))) + } + }) + source.addEventListener('dashboard_error', (event) => { + try { + const payload = JSON.parse((event as MessageEvent).data) as { error?: string } + onError?.(new Error(payload.error || 'Fleet dashboard stream snapshot failed')) + } catch { + onError?.(new Error('Fleet dashboard stream snapshot failed')) + } + }) + + return () => source.close() +} + +export async function approveFederationDashboardApproval( + instanceId: string, + approvalId: string, + reason: string, +): Promise { + return apiPost( + `/api/federation/dashboard/instances/${encodePathSegment(instanceId)}/approvals/${encodePathSegment(approvalId)}/approve`, + { reason }, + ) +} + +export async function denyFederationDashboardApproval( + instanceId: string, + approvalId: string, + reason: string, +): Promise { + return apiPost( + `/api/federation/dashboard/instances/${encodePathSegment(instanceId)}/approvals/${encodePathSegment(approvalId)}/deny`, + { reason }, + ) +} + // ── Graph Builder ──────────────────────────────────────────── export async function fetchActiveTopology(): Promise { diff --git a/src/warroom/src/api/providers.ts b/src/warroom/src/api/providers.ts index 66d9d6a..1f9b43c 100644 --- a/src/warroom/src/api/providers.ts +++ b/src/warroom/src/api/providers.ts @@ -10,6 +10,7 @@ export interface LaneAssignment { context_window: number cost_output_per_1k: number supports_tools: boolean + source?: string } export interface DiscoveredModel { @@ -29,6 +30,7 @@ export interface ProviderStackResponse { discovered_models: DiscoveredModel[] models_count: number last_refresh: string | null + lane_sources?: Record status: string } diff --git a/src/warroom/src/components/AuthGuard.tsx b/src/warroom/src/components/AuthGuard.tsx index 88a0a79..24c8f46 100644 --- a/src/warroom/src/components/AuthGuard.tsx +++ b/src/warroom/src/components/AuthGuard.tsx @@ -1,5 +1,5 @@ import { useEffect, useState, useCallback } from 'react' -import { Outlet, Navigate, useNavigate } from 'react-router-dom' +import { Outlet, Navigate, useLocation, useNavigate } from 'react-router-dom' import { validateSession, logout } from '@/api/auth' import { SessionExpiryModal } from './SessionExpiryModal' import { getErrorMessage } from '@/utils/errors' @@ -12,6 +12,7 @@ const WARNING_THRESHOLD_S = 300 // Show warning when <5 min remaining export function AuthGuard() { const navigate = useNavigate() + const location = useLocation() const [authState, setAuthState] = useState('checking') const [showExpiryModal, setShowExpiryModal] = useState(false) const [remainingSeconds, setRemainingSeconds] = useState(0) @@ -77,7 +78,13 @@ export function AuthGuard() { } if (authState === 'unauthenticated') { - return + return ( + + ) } return ( diff --git a/src/warroom/src/layouts/Sidebar.tsx b/src/warroom/src/layouts/Sidebar.tsx index b546205..c095911 100644 --- a/src/warroom/src/layouts/Sidebar.tsx +++ b/src/warroom/src/layouts/Sidebar.tsx @@ -60,6 +60,7 @@ const NAV_GROUPS: NavGroup[] = [ { title: 'FEDERATION', items: [ + { label: 'Fleet Dashboard', path: '/federation/fleet' }, { label: 'Overview', path: '/federation' }, { label: 'Graph Builder', path: '/federation/graph' }, { label: 'Audit Trail', path: '/federation/audit' }, diff --git a/src/warroom/src/layouts/VitalsBar.tsx b/src/warroom/src/layouts/VitalsBar.tsx index 8a93925..79e89df 100644 --- a/src/warroom/src/layouts/VitalsBar.tsx +++ b/src/warroom/src/layouts/VitalsBar.tsx @@ -21,13 +21,23 @@ function barColor(val: number, thresholds: [number, number] = [50, 90]) { function connectionState(health: HealthCheckResponse | null, ready: HealthReadyResponse | null) { if (!health && !ready) return { label: 'INITIALIZING', color: 'text-state-inactive', pulse: true } + if (ready?.ready) return { label: 'ACTIVE', color: 'text-state-healthy', pulse: false } + const components = health?.components ?? {} - const values = Object.values(components) - const allHealthy = values.every((v) => v === 'ok' || v === 'disabled') - const anyDegraded = values.some((v) => v === 'degraded') - const anyError = values.some((v) => v !== 'ok' && v !== 'disabled' && v !== 'degraded') - if (allHealthy) return { label: 'ACTIVE', color: 'text-state-healthy', pulse: false } - if (anyDegraded && !anyError) return { label: 'DEGRADED', color: 'text-state-degraded', pulse: false } + const coreComponents = ['gateway', 'orchestrator', 'local_llm'] + const coreValues = coreComponents + .map((key) => components[key]) + .filter((value): value is string => typeof value === 'string' && value.length > 0) + const gatewayOffline = health?.status && !['online', 'ok', 'healthy'].includes(health.status) + const coreBroken = coreValues.some((v) => !['ok', 'disabled', 'degraded'].includes(v)) + const coreDegraded = coreValues.some((v) => v === 'degraded') || ready?.ready === false + + if (!gatewayOffline && !coreBroken && !coreDegraded) { + return { label: 'ACTIVE', color: 'text-state-healthy', pulse: false } + } + if (!gatewayOffline && !coreBroken) { + return { label: 'DEGRADED', color: 'text-state-degraded', pulse: false } + } return { label: 'SEVERED', color: 'text-state-error', pulse: false } } @@ -187,7 +197,7 @@ export function VitalsBar() { {/* Connection */}
c.toUpperCase()) } +function startOfLocalDayIso(): string { + const start = new Date() + start.setHours(0, 0, 0, 0) + return start.toISOString() +} + +function actionCardTime(card: ActionCardData): string { + return formatTimeOnly(new Date(card.presentedAt * 1000).toISOString()) +} + // ── Component ─────────────────────────────────────────────────── export function CommandCenter() { usePageTitle('Command Center') const recentFetcher = useCallback(() => fetchReceipts({ limit: 8 }), []) + const todayStatsFetcher = useCallback(() => fetchReceiptStats(startOfLocalDayIso()), []) const { data: receiptsData } = usePolling({ fetcher: recentFetcher, interval: 15000 }) - const { data: statsData } = usePolling({ fetcher: fetchReceiptStats, interval: 30000 }) + const { data: statsData } = usePolling({ fetcher: todayStatsFetcher, interval: 30000 }) + const { data: approvalsData, error: approvalsError } = usePolling({ + fetcher: fetchGovernanceApprovals, + interval: 10000, + }) + const { data: actionCardsData, error: actionCardsError } = usePolling({ + fetcher: fetchPendingActionCards, + interval: 10000, + }) const receipts: ReceiptItem[] = receiptsData?.receipts ?? [] + const approvals: ApprovalItem[] = approvalsData?.approvals ?? [] + const actionCards: ActionCardData[] = actionCardsData?.cards.filter(card => !card.resolved) ?? [] const todayCount = statsData?.stats?.total_receipts ?? 0 - const pendingCount = statsData?.stats?.by_status?.['pending'] ?? 0 + const failedTodayCount = statsData?.stats?.by_status?.['failure'] ?? 0 + const pendingActionCount = approvals.length + actionCards.length + const pendingUnavailable = Boolean(approvalsError || actionCardsError) return (
@@ -88,10 +116,77 @@ export function CommandCenter() { {/* Right column: 1/3 width */}
- {/* Pending Actions — WR-15 */} + {/* Fleet dashboard entry */} +
+
+
+

Multi-Agent Dashboard

+

+ Fleet health, approvals, trust proposals, and instance entry points +

+
+
+ + Open Fleet Dashboard + +
+

Pending Actions

-

No pending actions

+ {pendingUnavailable ? ( +

Pending action data unavailable

+ ) : pendingActionCount === 0 ? ( +

No pending actions

+ ) : ( +
+ {approvals.slice(0, 4).map((item) => ( + +
+ + Approval + + + {formatTimeOnly(item.created_at)} + +
+

+ {item.name || item.capability || item.id} +

+ + ))} + {actionCards.slice(0, Math.max(0, 4 - approvals.length)).map((card) => ( + +
+ + Action Card + + + {actionCardTime(card)} + +
+

+ {card.title || card.cardId} +

+ + ))} + {pendingActionCount > 4 && ( +

+ {pendingActionCount - 4} more pending item{pendingActionCount - 4 === 1 ? '' : 's'}. +

+ )} +
+ )}
{/* Controls Panel */} @@ -106,8 +201,18 @@ export function CommandCenter() {

{todayCount}

- Pending -

{pendingCount}

+ Approvals +

{approvals.length}

+
+
+ Action Cards +

{actionCards.length}

+
+
+ Failed Today +

0 ? 'text-state-error' : 'text-text-primary'}`}> + {failedTodayCount} +

diff --git a/src/warroom/src/pages/CostTracker.tsx b/src/warroom/src/pages/CostTracker.tsx index 6d7b561..2e68ce4 100644 --- a/src/warroom/src/pages/CostTracker.tsx +++ b/src/warroom/src/pages/CostTracker.tsx @@ -29,6 +29,12 @@ const LANE_LABELS: Record = { cache: 'Cache', } +function laneSourceLabel(source?: string): string { + if (source === 'override') return 'Pinned' + if (source === 'fallback') return 'Default' + return 'Auto' +} + export function CostTracker() { usePageTitle('Cost Tracker') const { data: summary } = usePolling({ fetcher: fetchUsageSummary, interval: 15000 }) @@ -420,6 +426,7 @@ export function CostTracker() { Lane Model + Mode Context $/1K out Tools @@ -464,6 +471,15 @@ export function CostTracker() { )} {isLoading && ...} + + + {laneSourceLabel(l.source)} + + {formatCtx(l.context_window)} {l.cost_output_per_1k ? `$${l.cost_output_per_1k.toFixed(4)}` : '--'} diff --git a/src/warroom/src/pages/FleetDashboard.tsx b/src/warroom/src/pages/FleetDashboard.tsx new file mode 100644 index 0000000..cf31fa1 --- /dev/null +++ b/src/warroom/src/pages/FleetDashboard.tsx @@ -0,0 +1,662 @@ +import { useCallback, useEffect, useState } from 'react' +import { usePageTitle, usePolling } from '@/hooks' +import { + approveFederationDashboardApproval, + denyFederationDashboardApproval, + fetchFederationDashboard, + subscribeFederationDashboard, + type FleetActivityEvent, + type FleetApproval, + type FleetDashboardInstance, + type FleetTrustProposal, +} from '@/api/federation' +import { MetricCard } from '@/components/MetricCard' +import { PageLoader } from '@/components/PageLoader' +import { formatRelativeTime, formatTimestamp } from '@/utils/dateFormat' +import { getErrorMessage } from '@/utils/errors' + +const STATE_STYLES: Record = { + healthy: { + border: 'border-state-healthy/70', + dot: 'bg-state-healthy', + text: 'text-state-healthy', + label: 'Healthy', + }, + attention: { + border: 'border-state-warning/80', + dot: 'bg-state-warning', + text: 'text-state-warning', + label: 'Attention', + }, + critical: { + border: 'border-state-error', + dot: 'bg-state-error', + text: 'text-state-error', + label: 'Critical', + }, + paused: { + border: 'border-accent-primary', + dot: 'bg-accent-primary', + text: 'text-accent-primary', + label: 'Paused', + }, +} + +type DashboardDecision = 'approve' | 'deny' + +interface DecisionTarget { + decision: DashboardDecision + instanceId: string + approvalId: string + title: string + subtitle: string +} + +function roleLabel(role: string): string { + if (!role) return 'PEER' + return role.toUpperCase() +} + +function compactHash(hash: string): string { + return hash ? hash.slice(0, 10) : 'unavailable' +} + +function formatHeartbeat(instance: FleetDashboardInstance): string { + if (instance.heartbeat_state === 'lost') return 'LOST' + if (instance.heartbeat_age_s === null) return instance.heartbeat_state.toUpperCase() + if (instance.heartbeat_age_s < 60) return `${Math.round(instance.heartbeat_age_s)}s ago` + return formatRelativeTime(instance.last_heartbeat_at) +} + +function formatPct(value: number): string { + return `${Math.round(value)}%` +} + +function instanceGridClass(count: number): string { + if (count <= 1) return 'grid grid-cols-1 gap-4 lg:max-w-2xl' + if (count === 2) return 'grid grid-cols-1 gap-4 lg:grid-cols-2 xl:max-w-5xl' + return 'grid grid-cols-1 gap-4 md:grid-cols-2 xl:grid-cols-3' +} + +function DetailPill({ label, value, tone = 'muted' }: { + label: string + value: string | number + tone?: 'muted' | 'warning' | 'error' | 'healthy' +}) { + const toneClass = { + muted: 'text-text-secondary bg-surface-input', + warning: 'text-state-warning bg-state-warning/10', + error: 'text-state-error bg-state-error/10', + healthy: 'text-state-healthy bg-state-healthy/10', + }[tone] + + return ( +
+
{label}
+
{value}
+
+ ) +} + +function InstanceCard({ instance }: { instance: FleetDashboardInstance }) { + const style = STATE_STYLES[instance.state] + const approvalTone = instance.pending_approvals > 0 ? 'warning' : 'muted' + const proposalTone = instance.trust_proposals > 0 ? 'warning' : 'muted' + const budgetTone = instance.budget_threshold === 'normal' + ? 'healthy' + : instance.budget_threshold === 'warning' || instance.budget_threshold === 'spawn_restricted' + ? 'warning' + : 'error' + const commandCenterUrl = instance.command_center_url || (instance.is_self ? '/war-room/command' : '') + + return ( +
+
+
+
+ +

{instance.name}

+
+
+ {instance.instance_short_id || 'local'} +
+
+
+ {style.label} + + {instance.is_self ? 'SELF' : roleLabel(instance.role)} + +
+
+ +
+ + + + + + +
+ +
+
+ Soul + + {compactHash(instance.soul_version_hash)} + +
+
+
+
+
+ +
+
+ Latest Activity +
+ {instance.recent_activity ? ( +

+ {instance.recent_activity} + {instance.recent_activity_at && ( + ({formatRelativeTime(instance.recent_activity_at)}) + )} +

+ ) : ( +

No recent activity

+ )} +
+ + {instance.attention_reasons.length > 0 && ( +
+
+ Needs Attention +
+ {instance.attention_reasons.slice(0, 4).map((reason) => ( +
+ {reason} +
+ ))} +
+ )} + + +
+ ) +} + +function ApprovalsTable({ + approvals, + onDecision, +}: { + approvals: FleetApproval[] + onDecision: (target: DecisionTarget) => void +}) { + if (approvals.length === 0) { + return ( +
+

Unified Approval Queue

+

No pending approvals

+
+ ) + } + + return ( +
+

Unified Approval Queue

+
+ + + + + + + + + + + + + {approvals.map((item) => ( + + + + + + + + + ))} + +
InstanceActionTierCapabilityWaitingActions
{item.instance_name} +
{item.action_name}
+ {item.context && ( +
+ {item.context} +
+ )} +
+ + {item.risk_tier || 'T3'} + + + {item.capability} + {formatRelativeTime(item.waiting_since || item.created_at)} +
+ + +
+
+
+
+ ) +} + +function TrustTable({ + proposals, + onDecision, +}: { + proposals: FleetTrustProposal[] + onDecision: (target: DecisionTarget) => void +}) { + if (proposals.length === 0) { + return ( +
+

Unified Trust Proposals

+

No pending trust proposals

+
+ ) + } + + return ( +
+

Unified Trust Proposals

+
+ + + + + + + + + + + + + {proposals.map((item) => ( + + + + + + + + + ))} + +
InstanceCapabilityCurrentProposedSuccessesActions
{item.instance_name}{item.capability}T{item.current_tier}T{item.proposed_tier}{item.consecutive_successes} +
+ + +
+
+
+
+ ) +} + +function ActivityFeed({ events }: { events: FleetActivityEvent[] }) { + return ( +
+

Fleet Activity

+ {events.length === 0 ? ( +

No recent fleet activity

+ ) : ( +
+ {events.slice(0, 40).map((event) => ( +
+
+
+
{event.description}
+
{event.instance_name} - {event.event_type}
+
+
+ {formatRelativeTime(event.timestamp)} +
+
+
+ ))} +
+ )} +
+ ) +} + +function DecisionDialog({ + target, + reason, + error, + submitting, + onReasonChange, + onCancel, + onConfirm, +}: { + target: DecisionTarget | null + reason: string + error: Error | null + submitting: boolean + onReasonChange: (value: string) => void + onCancel: () => void + onConfirm: () => void +}) { + if (!target) return null + + const label = target.decision === 'approve' ? 'Approve' : 'Deny' + const confirmClass = target.decision === 'approve' + ? 'border-state-healthy/70 text-state-healthy hover:bg-state-healthy/10' + : 'border-state-error/70 text-state-error hover:bg-state-error/10' + + return ( +
+
+
+

{label} Decision

+

+ {target.title} +

+

{target.subtitle}

+
+ + +