From 7ed77a506a2fed74cbd5da22b6a728e20ef287bf Mon Sep 17 00:00:00 2001 From: Timidan Date: Thu, 2 Apr 2026 06:58:24 +0100 Subject: [PATCH] feat: add support for 'skipped' result in decision log and dashboard snapshot --- dashboard/src/components/CurrentDecision.tsx | 2 + dashboard/src/components/DecisionLog.tsx | 2 + src/loop/index.ts | 278 ++++++++++++++----- src/session/index.ts | 24 +- src/types/dashboard.ts | 2 +- src/ws/index.ts | 35 ++- 6 files changed, 266 insertions(+), 77 deletions(-) diff --git a/dashboard/src/components/CurrentDecision.tsx b/dashboard/src/components/CurrentDecision.tsx index 1e8371c..0dc9f8c 100644 --- a/dashboard/src/components/CurrentDecision.tsx +++ b/dashboard/src/components/CurrentDecision.tsx @@ -110,6 +110,8 @@ export function CurrentDecision({ snapshot }: CurrentDecisionProps) { ? "#22c55e" : latest.result === "blocked" ? "#ef4444" + : latest.result === "skipped" + ? "#f59e0b" : "#71717a", }} > diff --git a/dashboard/src/components/DecisionLog.tsx b/dashboard/src/components/DecisionLog.tsx index 42a1bac..dfc02bd 100644 --- a/dashboard/src/components/DecisionLog.tsx +++ b/dashboard/src/components/DecisionLog.tsx @@ -43,6 +43,8 @@ function resultDisplay(entry: DecisionLogEntry): { text: string; className: stri return { text: "blocked", className: "red" }; case "dry-run": return { text: "dry-run", className: "blue" }; + case "skipped": + return { text: "skipped", className: "amber" }; case "hold": return { text: "hold", className: "" }; case "executed": diff --git a/src/loop/index.ts b/src/loop/index.ts index 0bae677..536e0e7 100644 --- a/src/loop/index.ts +++ b/src/loop/index.ts @@ -21,10 +21,9 @@ import { } from "../risk/index.js"; import { execute, - getUsdcBalance, createClients, - BASE_TOKENS, refreshPortfolio, + TRADE_VAULT_ADDRESS, } from "../executor/index.js"; import { buildReceipt, @@ -41,6 +40,7 @@ import { type ScoredAsset, type DashboardSnapshot, type DecisionLogEntry, + type RiskGateResult, ASSET_UNIVERSE, TESTNET_UNIVERSE, MurmurError, @@ -61,6 +61,7 @@ import { getNonceMessage, verifyAndCreateSession, getSession, + getAutopilotSessions, updateSettings, setAutopilot, type UserSession, @@ -115,7 +116,7 @@ let policy: RiskPolicy; let treasuryState: TreasuryState; let receipts: DecisionReceipt[] = []; let isRunning = false; -let lastResetDate = new Date().toUTCString().slice(0, 16); // "Day, DD Mon YYYY" +const lastResetDates = new Map(); let decisionLog: DecisionLogEntry[] = []; let lastScoredAssets: ScoredAsset[] = []; @@ -131,32 +132,90 @@ const SANTIMENT_REFRESH_MS = 30 * 60 * 1000; // ENS identity let ensResolution = createDefaultEnsResolution(DEFAULT_AGENT_WALLET); +interface LoopRuntimeState { + policy: RiskPolicy; + treasuryState: TreasuryState; + receipts: DecisionReceipt[]; + decisionLog: DecisionLogEntry[]; + lastScoredAssets: ScoredAsset[]; + lastRiskGate: RiskGateResult | null; + cycleCount: number; +} + +function getGlobalRuntime(): LoopRuntimeState { + return { + policy, + treasuryState, + receipts, + decisionLog, + lastScoredAssets, + lastRiskGate, + cycleCount, + }; +} + +function setGlobalRuntime(runtime: LoopRuntimeState): void { + policy = runtime.policy; + treasuryState = runtime.treasuryState; + receipts = runtime.receipts; + decisionLog = runtime.decisionLog; + lastScoredAssets = runtime.lastScoredAssets; + lastRiskGate = runtime.lastRiskGate; + cycleCount = runtime.cycleCount; +} + +function getSessionRuntime(session: UserSession): LoopRuntimeState { + return { + policy: session.policy, + treasuryState: session.treasuryState, + receipts: session.receipts, + decisionLog: session.decisionLog, + lastScoredAssets: session.lastScoredAssets, + lastRiskGate: session.lastRiskGate, + cycleCount: session.cycleCount, + }; +} + +function setSessionRuntime(session: UserSession, runtime: LoopRuntimeState): void { + session.policy = runtime.policy; + session.treasuryState = runtime.treasuryState; + session.receipts = runtime.receipts; + session.decisionLog = runtime.decisionLog; + session.lastScoredAssets = runtime.lastScoredAssets; + session.lastRiskGate = runtime.lastRiskGate; + session.cycleCount = runtime.cycleCount; +} + function buildSnapshot( config: ReturnType, + runtime: LoopRuntimeState, currentCycle: DashboardSnapshot["currentCycle"] = null, ): DashboardSnapshot { return { - treasury: treasuryState ?? { + treasury: runtime.treasuryState ?? { usdcBalance: BigInt(0), totalPortfolioUsd: 0, positions: [], lastUpdatedAt: new Date().toISOString(), }, - scoredAssets: lastScoredAssets, - lastDecisions: decisionLog.slice(-20), - riskGate: lastRiskGate, + scoredAssets: runtime.lastScoredAssets, + lastDecisions: runtime.decisionLog.slice(-20), + riskGate: runtime.lastRiskGate, currentCycle, ethPrice: hasPriceData() ? getPriceState().currentPrice : null, - regime: detectRegime(lastScoredAssets), - latestFilecoinCid: receipts.length > 0 ? (receipts[receipts.length - 1]?.filecoinCid ?? null) : null, + regime: detectRegime(runtime.lastScoredAssets), + latestFilecoinCid: + runtime.receipts.length > 0 + ? (runtime.receipts[runtime.receipts.length - 1]?.filecoinCid ?? null) + : null, config: { network: "Base Sepolia", cronSchedule: config.cronSchedule, dryRun: config.dryRun, - maxNotionalUsd: config.delegationMaxNotionalUsd, - maxDailyTurnoverUsd: config.delegationDailyTurnoverUsd, + maxNotionalUsd: runtime.policy.maxNotionalUsd, + maxDailyTurnoverUsd: runtime.policy.maxDailyTurnoverUsd, }, - cycleCount, + cycleCount: runtime.cycleCount, uptimeSince, }; } @@ -171,13 +230,14 @@ function initPolicy(config: ReturnType): RiskPolicy { async function refreshTreasuryState( config: ReturnType, + previousTreasury: TreasuryState | null, session?: UserSession | null, ): Promise { if (!config.agentPrivateKey || !config.agentAddress) { return { usdcBalance: BigInt(1000 * 1e6), totalPortfolioUsd: 1000, - positions: treasuryState?.positions ?? [], + positions: previousTreasury?.positions ?? [], lastUpdatedAt: new Date().toISOString(), }; } @@ -188,8 +248,8 @@ async function refreshTreasuryState( privateKey: config.agentPrivateKey, }); - // Use the caller's vault if available, fall back to agent address let portfolioOwner = config.agentAddress; + let vaultAddress = TRADE_VAULT_ADDRESS; if (session) { try { const vaultAddr = await publicClient.readContract({ @@ -200,17 +260,37 @@ async function refreshTreasuryState( }); if (vaultAddr && vaultAddr !== "0x0000000000000000000000000000000000000000") { portfolioOwner = vaultAddr as `0x${string}`; + vaultAddress = vaultAddr as `0x${string}`; + } else { + return { + usdcBalance: BigInt(0), + totalPortfolioUsd: 0, + positions: [], + lastUpdatedAt: new Date().toISOString(), + }; } - } catch {} + } catch (err) { + console.warn( + `[Loop] Could not resolve vault for ${session.ownerAddress}: ${(err as Error).message}`, + ); + return ( + previousTreasury ?? { + usdcBalance: BigInt(0), + totalPortfolioUsd: 0, + positions: [], + lastUpdatedAt: new Date().toISOString(), + } + ); + } } - return await refreshPortfolio(publicClient, portfolioOwner); + return await refreshPortfolio(publicClient, portfolioOwner, vaultAddress); } catch (err) { console.warn( `[Loop] Could not refresh treasury state: ${(err as Error).message}`, ); return ( - treasuryState ?? { + previousTreasury ?? { usdcBalance: BigInt(0), totalPortfolioUsd: 0, positions: [], @@ -241,13 +321,17 @@ function separator(label?: string) { // ─── Daily Reset ─────────────────────────────────────────────────────────────── -function checkDailyReset() { +function checkDailyReset(currentPolicy: RiskPolicy, scopeKey: string): RiskPolicy { const currentDate = new Date().toUTCString().slice(0, 16); + const lastResetDate = lastResetDates.get(scopeKey); if (currentDate !== lastResetDate) { - log("idle", "Daily reset — resetting turnover counters"); - policy = resetDailyTurnover(policy); - lastResetDate = currentDate; + if (lastResetDate) { + log("idle", "Daily reset — resetting turnover counters"); + } + lastResetDates.set(scopeKey, currentDate); + return resetDailyTurnover(currentPolicy); } + return currentPolicy; } // ─── Phase: Sense ───────────────────────────────────────────────────────────── @@ -330,6 +414,12 @@ function detectRegime(scored: ScoredAsset[]): "bullish" | "bearish" | "neutral" // ─── Price-Driven Trading Decision ─────────────────────────────────────────── +// `momentum1m` and `momentum5m` are already expressed in percentage points, +// so thresholds here are also percentages, not decimal fractions. +const PRICE_BREAKOUT_1M_THRESHOLD_PCT = 0.15; +const PRICE_BREAKOUT_5M_THRESHOLD_PCT = 0.10; +const REGIME_MOMENTUM_THRESHOLD_PCT = 0.05; + function shouldTradeOnPrice( priceState: PriceFeedState, regime: "bullish" | "bearish" | "neutral", @@ -339,8 +429,8 @@ function shouldTradeOnPrice( // BUY: price breaking 5m high + positive momentum + not bearish regime if ( regime !== "bearish" && - momentum1m > 0.03 && - momentum5m > 0.02 && + momentum1m > PRICE_BREAKOUT_1M_THRESHOLD_PCT && + momentum5m > PRICE_BREAKOUT_5M_THRESHOLD_PCT && currentPrice >= high5m * 0.999 ) { return { @@ -352,8 +442,8 @@ function shouldTradeOnPrice( // EXIT: price breaking 5m low + negative momentum if ( - momentum1m < -0.03 && - momentum5m < -0.02 && + momentum1m < -PRICE_BREAKOUT_1M_THRESHOLD_PCT && + momentum5m < -PRICE_BREAKOUT_5M_THRESHOLD_PCT && currentPrice <= low5m * 1.001 ) { return { @@ -364,7 +454,7 @@ function shouldTradeOnPrice( } // Bullish regime + any positive momentum → buy - if (regime === "bullish" && momentum1m > 0.01) { + if (regime === "bullish" && momentum1m > REGIME_MOMENTUM_THRESHOLD_PCT) { return { action: "buy", confidence: 0.45, @@ -412,14 +502,15 @@ async function runDeliberate( function runRiskGatePhase( deliberationResult: Awaited>, scored: ScoredAsset[], + runtime: LoopRuntimeState, ) { const decision = deliberationResult.decision; const scoredAsset = scored.find((s) => s.slug === decision.slug); const result = runRiskGate({ decision, - treasury: treasuryState, - policy, + treasury: runtime.treasuryState, + policy: runtime.policy, scoredAsset, }); @@ -433,6 +524,7 @@ async function runExecute( deliberationResult: Awaited>, riskGateResult: ReturnType, config: ReturnType, + runtime: LoopRuntimeState, session?: UserSession | null, ) { const { decision } = deliberationResult; @@ -449,7 +541,7 @@ async function runExecute( // Guard: can't reduce/exit a position we don't hold if (decision.action === "reduce" || decision.action === "exit") { - const hasPosition = treasuryState.positions.some( + const hasPosition = runtime.treasuryState.positions.some( (p) => p.slug === decision.slug, ); if (!hasPosition) { @@ -510,9 +602,6 @@ async function runExecute( log("execute", `✅ Swap confirmed — tx: ${result.txHash}`); log("execute", ` BaseScan: https://sepolia.basescan.org/tx/${result.txHash}`); - // Update policy counters - policy = updatePolicyAfterTrade(policy, riskGateResult.effectiveSizeUsd); - return result; } @@ -525,6 +614,7 @@ async function runAttest( riskGateResult: ReturnType, executionResult: Awaited>, config: ReturnType, + runtime: LoopRuntimeState, ): Promise { if (!config.agentAddress) { throw new MurmurError( @@ -533,8 +623,11 @@ async function runAttest( ); } - const capBefore = policy.maxDelegationSpendUsd - policy.delegationSpentUsd; - const capAfter = capBefore - riskGateResult.effectiveSizeUsd; + const capBefore = + runtime.policy.maxDelegationSpendUsd - runtime.policy.delegationSpentUsd; + const capAfter = executionResult + ? capBefore - riskGateResult.effectiveSizeUsd + : capBefore; const receipt = buildReceipt({ cycleId, @@ -561,11 +654,11 @@ async function runAttest( console.log(summarizeNotarization(notarized)); - receipts.push(notarized.receipt); + runtime.receipts.push(notarized.receipt); // Keep only last 100 receipts in memory - if (receipts.length > 100) { - receipts = receipts.slice(-100); + if (runtime.receipts.length > 100) { + runtime.receipts = runtime.receipts.slice(-100); } return notarized.receipt; @@ -578,6 +671,9 @@ async function runCycle( triggeredBy: LoopCycle["triggeredBy"] = "cron", session?: UserSession | null, ): Promise { + const runtime = session ? getSessionRuntime(session) : getGlobalRuntime(); + const resetScope = session?.token ?? "__global__"; + const broadcastToken = session?.token; const cycleId = randomUUID(); const startedAt = new Date().toISOString(); @@ -592,15 +688,19 @@ async function runCycle( try { // ── 1. Daily reset check - checkDailyReset(); + runtime.policy = checkDailyReset(runtime.policy, resetScope); // ── 2. Refresh treasury + revalue positions with Binance price - treasuryState = await refreshTreasuryState(config, session); + runtime.treasuryState = await refreshTreasuryState( + config, + runtime.treasuryState, + session, + ); if (hasPriceData()) { const ethPrice = getPriceState().currentPrice; - const usdcUsd = Number(treasuryState.usdcBalance) / 1e6; + const usdcUsd = Number(runtime.treasuryState.usdcBalance) / 1e6; let positionsUsd = 0; - for (const pos of treasuryState.positions) { + for (const pos of runtime.treasuryState.positions) { if (pos.slug === "ethereum" || pos.slug === "weth") { const amt = Number(pos.amountHeld) / 1e18; const val = amt * ethPrice; @@ -610,27 +710,27 @@ async function runCycle( positionsUsd += pos.usdValueAtEntry; } } - treasuryState.totalPortfolioUsd = usdcUsd + positionsUsd; + runtime.treasuryState.totalPortfolioUsd = usdcUsd + positionsUsd; } log( "sense", - `Treasury: $${treasuryState.totalPortfolioUsd.toFixed(2)} total | USDC: $${(Number(treasuryState.usdcBalance) / 1e6).toFixed(2)} | ${treasuryState.positions.length} position(s)`, + `Treasury: $${runtime.treasuryState.totalPortfolioUsd.toFixed(2)} total | USDC: $${(Number(runtime.treasuryState.usdcBalance) / 1e6).toFixed(2)} | ${runtime.treasuryState.positions.length} position(s)`, ); // ── 3. Santiment (cached, refreshed every 30 min) cycle.phase = "sense"; - broadcastPhase(cycleId, "sense", new Date().toISOString()); + broadcastPhase(cycleId, "sense", new Date().toISOString(), broadcastToken); const universe = await refreshSantimentCache(config); // ── 4. Normalize + Score cycle.phase = "normalize"; - broadcastPhase(cycleId, "normalize", new Date().toISOString()); + broadcastPhase(cycleId, "normalize", new Date().toISOString(), broadcastToken); const { scored, candidates } = runNormalizeAndScore( universe, config.candidateTopN, ); - lastScoredAssets = scored; - broadcastPhase(cycleId, "score", new Date().toISOString()); + runtime.lastScoredAssets = scored; + broadcastPhase(cycleId, "score", new Date().toISOString(), broadcastToken); // ── 5. Detect regime from Santiment const regime = detectRegime(scored); @@ -639,7 +739,7 @@ async function runCycle( // ── 6. Price-driven deliberation (dual-lane) const externalContext: string | null = null; cycle.phase = "deliberate"; - broadcastPhase(cycleId, "deliberate", new Date().toISOString()); + broadcastPhase(cycleId, "deliberate", new Date().toISOString(), broadcastToken); let finalDeliberation; const priceState = getPriceState(); @@ -648,8 +748,14 @@ async function runCycle( log("deliberate", `ETH/USD: $${priceState.currentPrice.toFixed(2)} | 1m: ${priceState.momentum1m.toFixed(3)}% | 5m: ${priceState.momentum5m.toFixed(3)}%`); const priceTrigger = shouldTradeOnPrice(priceState, regime); + const hasEthPosition = runtime.treasuryState.positions.some( + (position) => position.slug === "ethereum" || position.slug === "weth", + ); - if (priceTrigger.action !== "hold") { + if (priceTrigger.action === "exit" && !hasEthPosition) { + log("deliberate", "Ignoring EXIT price trigger — no ETH position is open"); + finalDeliberation = await runDeliberate(candidates, cycleId, config, false, externalContext); + } else if (priceTrigger.action !== "hold") { log("deliberate", `Price trigger: ${priceTrigger.action.toUpperCase()} — ${priceTrigger.reason}`); // Use Venice AI to narrate/validate @@ -685,9 +791,9 @@ async function runCycle( // ── 7. Risk gate cycle.phase = "risk_gate"; - broadcastPhase(cycleId, "risk_gate", new Date().toISOString()); - const riskGateResult = runRiskGatePhase(finalDeliberation, scored); - lastRiskGate = riskGateResult; + broadcastPhase(cycleId, "risk_gate", new Date().toISOString(), broadcastToken); + const riskGateResult = runRiskGatePhase(finalDeliberation, scored, runtime); + runtime.lastRiskGate = riskGateResult; // ── 8. Quote (embedded in execute) cycle.phase = "quote"; @@ -695,18 +801,26 @@ async function runCycle( // ── 9. Execute cycle.phase = "execute"; - broadcastPhase(cycleId, "execute", new Date().toISOString()); + broadcastPhase(cycleId, "execute", new Date().toISOString(), broadcastToken); const executionResult = await runExecute( finalDeliberation, riskGateResult, config, + runtime, session, ); // ── 9.5 Refresh treasury after trade if (executionResult) { - treasuryState = await refreshTreasuryState(config, session); - log("execute", `Treasury refreshed — $${treasuryState.totalPortfolioUsd.toFixed(2)} total | ${treasuryState.positions.length} positions`); + runtime.treasuryState = await refreshTreasuryState( + config, + runtime.treasuryState, + session, + ); + log( + "execute", + `Treasury refreshed — $${runtime.treasuryState.totalPortfolioUsd.toFixed(2)} total | ${runtime.treasuryState.positions.length} positions`, + ); } // ── 10. Attest (skip if no agent address configured) @@ -720,11 +834,19 @@ async function runCycle( riskGateResult, executionResult, config, + runtime, ); } else { log("attest", "No AGENT_ADDRESS configured — skipping attestation"); } + if (executionResult) { + runtime.policy = updatePolicyAfterTrade( + runtime.policy, + riskGateResult.effectiveSizeUsd, + ); + } + cycle.phase = "idle"; cycle.completedAt = new Date().toISOString(); cycle.receipt = receipt; @@ -738,7 +860,7 @@ async function runCycle( separator(); // ── Dashboard: log decision and broadcast snapshot - cycleCount++; + runtime.cycleCount++; const logEntry: DecisionLogEntry = { cycleId, timestamp: new Date().toISOString(), @@ -748,15 +870,31 @@ async function runCycle( confidence: finalDeliberation.decision.confidence, riskApproved: riskGateResult.approved, effectiveSizeUsd: riskGateResult.effectiveSizeUsd, - result: executionResult ? "executed" : riskGateResult.approved ? (config.dryRun ? "dry-run" : "hold") : "blocked", + result: executionResult + ? "executed" + : !riskGateResult.approved + ? "blocked" + : finalDeliberation.decision.action === "hold" + ? "hold" + : config.dryRun + ? "dry-run" + : "skipped", pnlPct: null, filecoinCid: receipt?.filecoinCid ?? null, txHash: executionResult?.txHash ?? null, }; - decisionLog.push(logEntry); - if (decisionLog.length > 50) decisionLog = decisionLog.slice(-50); + runtime.decisionLog.push(logEntry); + if (runtime.decisionLog.length > 50) { + runtime.decisionLog = runtime.decisionLog.slice(-50); + } + + if (session) { + setSessionRuntime(session, runtime); + } else { + setGlobalRuntime(runtime); + } - broadcastSnapshot(buildSnapshot(config)); + broadcastSnapshot(buildSnapshot(config, runtime), broadcastToken); return cycle; } catch (err) { @@ -771,6 +909,12 @@ async function runCycle( console.error("[Loop] Cycle error details:", err); + if (session) { + setSessionRuntime(session, runtime); + } else { + setGlobalRuntime(runtime); + } + return cycle; } } @@ -860,7 +1004,7 @@ async function main() { await runStartupChecks(config); policy = initPolicy(config); - treasuryState = await refreshTreasuryState(config); + treasuryState = await refreshTreasuryState(config, null); registerSignalHandlers(); @@ -925,6 +1069,7 @@ async function main() { signature: signature as `0x${string}`, }); if (!session) { res.status(401).json({ error: "Invalid signature" }); return; } + broadcastSnapshot(buildSnapshot(config, getSessionRuntime(session)), session.token); res.json({ token: session.token, agentAddress: session.agentAddress, @@ -973,7 +1118,7 @@ async function main() { } // Broadcast an initial snapshot so clients don't stay on skeleton - broadcastSnapshot(buildSnapshot(config)); + broadcastSnapshot(buildSnapshot(config, getGlobalRuntime())); // Start x402 API server for paid data access startX402Server({ @@ -1021,7 +1166,14 @@ async function main() { isRunning = true; try { - await runCycle(config, "cron"); + const autopilotSessions = getAutopilotSessions(); + if (autopilotSessions.length === 0) { + await runCycle(config, "cron"); + } else { + for (const session of autopilotSessions) { + await runCycle(config, "cron", session); + } + } } finally { isRunning = false; } diff --git a/src/session/index.ts b/src/session/index.ts index 7ba27a6..cc16ed6 100644 --- a/src/session/index.ts +++ b/src/session/index.ts @@ -98,16 +98,28 @@ export async function verifyAndCreateSession(params: { } } - // Create new agent wallet for this user - const agentPrivateKey = generatePrivateKey(); + // Use the configured on-chain agent when available so the dashboard shows + // the address that can actually execute against deployed vaults. + const configuredAgentPrivateKey = process.env.AGENT_PRIVATE_KEY?.trim() ?? ""; + const configuredAgentAddress = process.env.AGENT_ADDRESS?.trim() ?? ""; + const hasConfiguredAgent = + configuredAgentPrivateKey.startsWith("0x") && + configuredAgentAddress.startsWith("0x"); + + const agentPrivateKey = hasConfiguredAgent + ? (configuredAgentPrivateKey as `0x${string}`) + : generatePrivateKey(); const agentAccount = privateKeyToAccount(agentPrivateKey); + const agentAddress = hasConfiguredAgent + ? (configuredAgentAddress as Address) + : agentAccount.address; const token = randomUUID(); const session: UserSession = { token, ownerAddress: address, agentPrivateKey, - agentAddress: agentAccount.address, + agentAddress, settings: { ...DEFAULT_SETTINGS }, autopilotEnabled: true, nonce, @@ -128,7 +140,7 @@ export async function verifyAndCreateSession(params: { }; sessions.set(token, session); - console.log(`[Session] New session for ${address} → agent wallet ${agentAccount.address}`); + console.log(`[Session] New session for ${address} → agent wallet ${agentAddress}`); return session; } @@ -182,6 +194,10 @@ export function setAutopilot(token: string, enabled: boolean): UserSession | nul return session; } +export function getAutopilotSessions(): UserSession[] { + return [...sessions.values()].filter((session) => session.autopilotEnabled); +} + function buildPolicyFromSettings(settings: UserSettings): RiskPolicy { const preset = RISK_PRESETS[settings.riskProfile]; return defaultPolicy({ diff --git a/src/types/dashboard.ts b/src/types/dashboard.ts index 796db14..0dc712d 100644 --- a/src/types/dashboard.ts +++ b/src/types/dashboard.ts @@ -16,7 +16,7 @@ export interface DecisionLogEntry { confidence: number; riskApproved: boolean; effectiveSizeUsd: number; - result: "executed" | "blocked" | "hold" | "dry-run"; + result: "executed" | "blocked" | "hold" | "dry-run" | "skipped"; pnlPct: number | null; filecoinCid?: string | null; txHash?: string | null; diff --git a/src/ws/index.ts b/src/ws/index.ts index 56c2daf..b88c9f3 100644 --- a/src/ws/index.ts +++ b/src/ws/index.ts @@ -8,6 +8,7 @@ import { getSession } from "../session/index.js"; let wss: WebSocketServer | null = null; let httpServer: Server | null = null; let latestSnapshot: DashboardSnapshot | null = null; +const latestSnapshotsByToken = new Map(); // Shared express app for HTTP endpoints on the same port const ALLOWED_ORIGINS = new Set( @@ -54,10 +55,13 @@ export function startWsServer(port: number = 3001): WebSocketServer { return; } + (ws as WebSocket & { __token?: string }).__token = token; + console.log(`[WS] Client connected (${wss!.clients.size} total)`); - if (latestSnapshot) { - send(ws, { type: "snapshot", data: latestSnapshot }); + const initialSnapshot = latestSnapshotsByToken.get(token) ?? latestSnapshot; + if (initialSnapshot) { + send(ws, { type: "snapshot", data: initialSnapshot }); } ws.on("close", () => { @@ -92,11 +96,15 @@ export function startWsServer(port: number = 3001): WebSocketServer { return wss; } -export function broadcast(message: WsMessage): void { +export function broadcast(message: WsMessage, token?: string): void { if (!wss) return; if (message.type === "snapshot") { - latestSnapshot = message.data; + if (token) { + latestSnapshotsByToken.set(token, message.data); + } else { + latestSnapshot = message.data; + } } const payload = JSON.stringify(message, (_key, value) => @@ -104,21 +112,30 @@ export function broadcast(message: WsMessage): void { ); for (const client of wss.clients) { - if (client.readyState === WebSocket.OPEN) { + const clientToken = (client as WebSocket & { __token?: string }).__token; + if ( + client.readyState === WebSocket.OPEN && + (!token || clientToken === token) + ) { client.send(payload); } } } -export function broadcastPhase(cycleId: string, phase: string, timestamp: string): void { +export function broadcastPhase( + cycleId: string, + phase: string, + timestamp: string, + token?: string, +): void { broadcast({ type: "phase", data: { cycleId, phase: phase as any, timestamp }, - }); + }, token); } -export function broadcastSnapshot(snapshot: DashboardSnapshot): void { - broadcast({ type: "snapshot", data: snapshot }); +export function broadcastSnapshot(snapshot: DashboardSnapshot, token?: string): void { + broadcast({ type: "snapshot", data: snapshot }, token); } export function stopWsServer(): void {