diff --git a/backend/.env.example b/backend/.env.example index 84163b8..44bdff1 100644 --- a/backend/.env.example +++ b/backend/.env.example @@ -30,6 +30,7 @@ DATABASE_URL=postgresql://pulsartrack:pulsartrack_dev_password@localhost:5432/pu STELLAR_NETWORK=testnet SOROBAN_RPC_URL=https://soroban-testnet.stellar.org HORIZON_URL=https://horizon-testnet.stellar.org +SIMULATION_ACCOUNT=GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN # Account used for read-only contract simulations # Contract IDs (deploy with scripts/deploy.sh, then fill these in) CONTRACT_AD_REGISTRY= diff --git a/backend/src/index.ts b/backend/src/index.ts index f2e3c53..cec64c2 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -18,6 +18,7 @@ import { checkDbConnection } from "./config/database"; import { validateContractIds } from "./config/stellar"; import prisma from "./db/prisma"; import redisClient from "./config/redis"; +import { validateSimulationAccount } from "./services/soroban-client"; const app = express(); const PORT = parseInt(process.env.PORT || "4000", 10); @@ -85,6 +86,9 @@ setupWebSocketServer(server); async function start() { // Validate contract IDs — throws in production, warns in development validateContractIds(); + + // Validate simulation account + await validateSimulationAccount(); // Verify database connection — fail hard in production const dbOk = await checkDbConnection(); diff --git a/backend/src/services/horizon.ts b/backend/src/services/horizon.ts index 75e7d96..cb2a38a 100644 --- a/backend/src/services/horizon.ts +++ b/backend/src/services/horizon.ts @@ -45,22 +45,50 @@ export async function getAccountTransactions(address: string, limit = 20) { } /** - * Stream ledger events for contract activity + * Stream ledger events for contract activity with automatic reconnection */ export function streamLedgers( onLedger: (ledger: any) => void, onError?: (err: any) => void ): () => void { const server = createHorizonServer(); - const es = server - .ledgers() - .cursor('now') - .stream({ - onmessage: onLedger, - onerror: onError, - }); + let reconnectDelay = 1000; + let isClosed = false; + let es: any = null; - return () => (es as any)?.close?.(); + function connect() { + if (isClosed) return; + + es = server + .ledgers() + .cursor('now') + .stream({ + onmessage: (ledger) => { + reconnectDelay = 1000; // Reset on success + onLedger(ledger); + }, + onerror: (err) => { + logger.error({ err }, '[Horizon] Stream error, reconnecting...'); + es?.close?.(); + + if (onError) { + onError(err); + } + + if (!isClosed) { + setTimeout(connect, reconnectDelay); + reconnectDelay = Math.min(reconnectDelay * 2, 30000); + } + }, + }); + } + + connect(); + + return () => { + isClosed = true; + es?.close?.(); + }; } /** diff --git a/backend/src/services/soroban-client.ts b/backend/src/services/soroban-client.ts index b10866c..e52a4c0 100644 --- a/backend/src/services/soroban-client.ts +++ b/backend/src/services/soroban-client.ts @@ -10,8 +10,28 @@ import { } from "@stellar/stellar-sdk"; import { STELLAR_REQUEST_TIMEOUT_MS, stellarConfig } from "../config/stellar"; +import { logger } from "../lib/logger"; + const SIMULATION_ACCOUNT = - "GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN"; + process.env.SIMULATION_ACCOUNT || "GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN"; + +/** + * Validate that the simulation account exists and is funded on the target network. + * In production, we fail hard if it doesn't exist. + */ +export async function validateSimulationAccount() { + const server = getServer(); + try { + await server.getAccount(SIMULATION_ACCOUNT); + logger.info(`[Soroban] Simulation account ${SIMULATION_ACCOUNT} validated`); + } catch (err) { + logger.error(`[Soroban] Simulation account ${SIMULATION_ACCOUNT} not found or unfunded. Read-only calls will fail.`); + if (process.env.NODE_ENV === "production") { + logger.fatal("[Soroban] Aborting due to missing simulation account in production"); + process.exit(1); + } + } +} export function getServer(): rpc.Server { return new rpc.Server(stellarConfig.sorobanRpcUrl, { diff --git a/backend/src/services/websocket-server.ts b/backend/src/services/websocket-server.ts index 5c204ac..4fbd598 100644 --- a/backend/src/services/websocket-server.ts +++ b/backend/src/services/websocket-server.ts @@ -33,11 +33,7 @@ const clients = new Map(); const connectionsPerIp = new Map(); const MAX_CONNECTIONS_PER_IP = 5; -const INITIAL_BACKOFF_MS = 1000; -const MAX_BACKOFF_MS = 30000; -let currentBackoff = INITIAL_BACKOFF_MS; let stopStream: (() => void) | null = null; -let reconnectTimer: ReturnType | null = null; const JWT_SECRET = process.env.JWT_SECRET || crypto.randomBytes(32).toString("hex"); @@ -84,7 +80,6 @@ function parseClientMessage(raw: string): ClientMessage | null { function startLedgerStream(): void { stopStream = streamLedgers( (ledger) => { - currentBackoff = INITIAL_BACKOFF_MS; broadcastToChannel("ledger", { type: "LEDGER_CLOSED", payload: { @@ -96,42 +91,18 @@ function startLedgerStream(): void { }); }, (err: any) => { - logger.error(err, "[WS] Ledger stream error"); - scheduleReconnect(); + logger.error(err, "[WS] Ledger stream error notified"); + broadcastToChannel("ledger", { + type: "reconnecting", + payload: { + message: "Horizon stream dropped, reconnecting build-in...", + }, + timestamp: Date.now(), + }); }, ); } -function scheduleReconnect(): void { - if (reconnectTimer) return; - - broadcastToChannel("ledger", { - type: "reconnecting", - payload: { - message: "Horizon stream dropped, reconnecting...", - retryMs: currentBackoff, - }, - timestamp: Date.now(), - }); - - logger.info(`[WS] Reconnecting in ${currentBackoff}ms...`); - - reconnectTimer = setTimeout(() => { - reconnectTimer = null; - if (stopStream) { - try { stopStream(); } catch { /* already closed */ } - stopStream = null; - } - startLedgerStream(); - broadcastToChannel("ledger", { - type: "reconnected", - payload: { message: "Horizon stream resumed" }, - timestamp: Date.now(), - }); - currentBackoff = Math.min(currentBackoff * 2, MAX_BACKOFF_MS); - }, currentBackoff); -} - export function setupWebSocketServer(server: Server): WebSocketServer { const wss = new WebSocketServer({ server, path: "/ws" }); @@ -231,7 +202,6 @@ export function setupWebSocketServer(server: Server): WebSocketServer { startLedgerStream(); wss.on("close", () => { - if (reconnectTimer) { clearTimeout(reconnectTimer); reconnectTimer = null; } if (stopStream) { stopStream(); stopStream = null; } }); diff --git a/contracts/campaign-orchestrator/src/lib.rs b/contracts/campaign-orchestrator/src/lib.rs index d12e789..fd844ae 100644 --- a/contracts/campaign-orchestrator/src/lib.rs +++ b/contracts/campaign-orchestrator/src/lib.rs @@ -376,6 +376,22 @@ impl CampaignOrchestratorContract { if campaign.current_views >= campaign.target_views { campaign.status = CampaignStatus::Completed; + + // Decrement active campaigns count for advertiser + let stats_key = DataKey::AdvertiserStats(campaign.advertiser.clone()); + if let Some(mut stats) = env + .storage() + .persistent() + .get::(&stats_key) + { + stats.active_campaigns = stats.active_campaigns.saturating_sub(1); + env.storage().persistent().set(&stats_key, &stats); + env.storage().persistent().extend_ttl( + &stats_key, + PERSISTENT_LIFETIME_THRESHOLD, + PERSISTENT_BUMP_AMOUNT, + ); + } } let _ttl_key = DataKey::Campaign(campaign_id); @@ -433,6 +449,27 @@ impl CampaignOrchestratorContract { panic!("unauthorized"); } + // Only decrement stats if it was active + match campaign.status { + CampaignStatus::Active => { + let stats_key = DataKey::AdvertiserStats(advertiser.clone()); + if let Some(mut stats) = env + .storage() + .persistent() + .get::(&stats_key) + { + stats.active_campaigns = stats.active_campaigns.saturating_sub(1); + env.storage().persistent().set(&stats_key, &stats); + env.storage().persistent().extend_ttl( + &stats_key, + PERSISTENT_LIFETIME_THRESHOLD, + PERSISTENT_BUMP_AMOUNT, + ); + } + } + _ => {} + } + campaign.status = CampaignStatus::Paused; campaign.last_updated = env.ledger().timestamp(); let _ttl_key = DataKey::Campaign(campaign_id); @@ -461,6 +498,27 @@ impl CampaignOrchestratorContract { panic!("unauthorized"); } + // Only increment stats if it was paused + match campaign.status { + CampaignStatus::Paused => { + let stats_key = DataKey::AdvertiserStats(advertiser.clone()); + if let Some(mut stats) = env + .storage() + .persistent() + .get::(&stats_key) + { + stats.active_campaigns += 1; + env.storage().persistent().set(&stats_key, &stats); + env.storage().persistent().extend_ttl( + &stats_key, + PERSISTENT_LIFETIME_THRESHOLD, + PERSISTENT_BUMP_AMOUNT, + ); + } + } + _ => panic!("campaign not paused"), + } + campaign.status = CampaignStatus::Active; campaign.last_updated = env.ledger().timestamp(); let _ttl_key = DataKey::Campaign(campaign_id); diff --git a/frontend/src/hooks/useAnalytics.ts b/frontend/src/hooks/useAnalytics.ts index bd25a13..0da519c 100644 --- a/frontend/src/hooks/useAnalytics.ts +++ b/frontend/src/hooks/useAnalytics.ts @@ -46,7 +46,7 @@ export function useAnalyticsTimeseries({ campaignIds, timeframe }: UseAnalyticsT return () => { controller.abort(); }; - }, [campaignIds, timeframe]); + }, [campaignIds.join(','), timeframe]); return { data, loading, error }; }