Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ DATABASE_URL=postgresql://pulsartrack:pulsartrack_dev_password@localhost:5432/pu
STELLAR_NETWORK=testnet
SOROBAN_RPC_URL=https://soroban-testnet.stellar.org
HORIZON_URL=https://horizon-testnet.stellar.org
SIMULATION_ACCOUNT=GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN # Account used for read-only contract simulations

# Contract IDs (deploy with scripts/deploy.sh, then fill these in)
CONTRACT_AD_REGISTRY=
Expand Down
4 changes: 4 additions & 0 deletions backend/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { checkDbConnection } from "./config/database";
import { validateContractIds } from "./config/stellar";
import prisma from "./db/prisma";
import redisClient from "./config/redis";
import { validateSimulationAccount } from "./services/soroban-client";

const app = express();
const PORT = parseInt(process.env.PORT || "4000", 10);
Expand Down Expand Up @@ -85,6 +86,9 @@ setupWebSocketServer(server);
async function start() {
// Validate contract IDs — throws in production, warns in development
validateContractIds();

// Validate simulation account
await validateSimulationAccount();

// Verify database connection — fail hard in production
const dbOk = await checkDbConnection();
Expand Down
46 changes: 37 additions & 9 deletions backend/src/services/horizon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,50 @@ export async function getAccountTransactions(address: string, limit = 20) {
}

/**
* Stream ledger events for contract activity
* Stream ledger events for contract activity with automatic reconnection
*/
export function streamLedgers(
onLedger: (ledger: any) => void,
onError?: (err: any) => void
): () => void {
const server = createHorizonServer();
const es = server
.ledgers()
.cursor('now')
.stream({
onmessage: onLedger,
onerror: onError,
});
let reconnectDelay = 1000;
let isClosed = false;
let es: any = null;

return () => (es as any)?.close?.();
function connect() {
if (isClosed) return;

es = server
.ledgers()
.cursor('now')
.stream({
onmessage: (ledger) => {
reconnectDelay = 1000; // Reset on success
onLedger(ledger);
},
onerror: (err) => {
logger.error({ err }, '[Horizon] Stream error, reconnecting...');
es?.close?.();

if (onError) {
onError(err);
}

if (!isClosed) {
setTimeout(connect, reconnectDelay);
reconnectDelay = Math.min(reconnectDelay * 2, 30000);
}
},
});
}

connect();

return () => {
isClosed = true;
es?.close?.();
};
}

/**
Expand Down
22 changes: 21 additions & 1 deletion backend/src/services/soroban-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,28 @@ import {
} from "@stellar/stellar-sdk";
import { STELLAR_REQUEST_TIMEOUT_MS, stellarConfig } from "../config/stellar";

import { logger } from "../lib/logger";

const SIMULATION_ACCOUNT =
"GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN";
process.env.SIMULATION_ACCOUNT || "GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN";

/**
* Validate that the simulation account exists and is funded on the target network.
* In production, we fail hard if it doesn't exist.
*/
export async function validateSimulationAccount() {
const server = getServer();
try {
await server.getAccount(SIMULATION_ACCOUNT);
logger.info(`[Soroban] Simulation account ${SIMULATION_ACCOUNT} validated`);
} catch (err) {
logger.error(`[Soroban] Simulation account ${SIMULATION_ACCOUNT} not found or unfunded. Read-only calls will fail.`);
if (process.env.NODE_ENV === "production") {
logger.fatal("[Soroban] Aborting due to missing simulation account in production");
process.exit(1);
}
}
}

export function getServer(): rpc.Server {
return new rpc.Server(stellarConfig.sorobanRpcUrl, {
Expand Down
46 changes: 8 additions & 38 deletions backend/src/services/websocket-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,7 @@ const clients = new Map<WebSocket, ClientState>();
const connectionsPerIp = new Map<string, number>();
const MAX_CONNECTIONS_PER_IP = 5;

const INITIAL_BACKOFF_MS = 1000;
const MAX_BACKOFF_MS = 30000;
let currentBackoff = INITIAL_BACKOFF_MS;
let stopStream: (() => void) | null = null;
let reconnectTimer: ReturnType<typeof setTimeout> | null = null;

const JWT_SECRET =
process.env.JWT_SECRET || crypto.randomBytes(32).toString("hex");
Expand Down Expand Up @@ -84,7 +80,6 @@ function parseClientMessage(raw: string): ClientMessage | null {
function startLedgerStream(): void {
stopStream = streamLedgers(
(ledger) => {
currentBackoff = INITIAL_BACKOFF_MS;
broadcastToChannel("ledger", {
type: "LEDGER_CLOSED",
payload: {
Expand All @@ -96,42 +91,18 @@ function startLedgerStream(): void {
});
},
(err: any) => {
logger.error(err, "[WS] Ledger stream error");
scheduleReconnect();
logger.error(err, "[WS] Ledger stream error notified");
broadcastToChannel("ledger", {
type: "reconnecting",
payload: {
message: "Horizon stream dropped, reconnecting build-in...",
},
timestamp: Date.now(),
});
},
);
}

function scheduleReconnect(): void {
if (reconnectTimer) return;

broadcastToChannel("ledger", {
type: "reconnecting",
payload: {
message: "Horizon stream dropped, reconnecting...",
retryMs: currentBackoff,
},
timestamp: Date.now(),
});

logger.info(`[WS] Reconnecting in ${currentBackoff}ms...`);

reconnectTimer = setTimeout(() => {
reconnectTimer = null;
if (stopStream) {
try { stopStream(); } catch { /* already closed */ }
stopStream = null;
}
startLedgerStream();
broadcastToChannel("ledger", {
type: "reconnected",
payload: { message: "Horizon stream resumed" },
timestamp: Date.now(),
});
currentBackoff = Math.min(currentBackoff * 2, MAX_BACKOFF_MS);
}, currentBackoff);
}

export function setupWebSocketServer(server: Server): WebSocketServer {
const wss = new WebSocketServer({ server, path: "/ws" });

Expand Down Expand Up @@ -231,7 +202,6 @@ export function setupWebSocketServer(server: Server): WebSocketServer {
startLedgerStream();

wss.on("close", () => {
if (reconnectTimer) { clearTimeout(reconnectTimer); reconnectTimer = null; }
if (stopStream) { stopStream(); stopStream = null; }
});

Expand Down
58 changes: 58 additions & 0 deletions contracts/campaign-orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,22 @@ impl CampaignOrchestratorContract {

if campaign.current_views >= campaign.target_views {
campaign.status = CampaignStatus::Completed;

// Decrement active campaigns count for advertiser
let stats_key = DataKey::AdvertiserStats(campaign.advertiser.clone());
if let Some(mut stats) = env
.storage()
.persistent()
.get::<DataKey, AdvertiserStats>(&stats_key)
{
stats.active_campaigns = stats.active_campaigns.saturating_sub(1);
env.storage().persistent().set(&stats_key, &stats);
env.storage().persistent().extend_ttl(
&stats_key,
PERSISTENT_LIFETIME_THRESHOLD,
PERSISTENT_BUMP_AMOUNT,
);
}
}

let _ttl_key = DataKey::Campaign(campaign_id);
Expand Down Expand Up @@ -433,6 +449,27 @@ impl CampaignOrchestratorContract {
panic!("unauthorized");
}

// Only decrement stats if it was active
match campaign.status {
CampaignStatus::Active => {
let stats_key = DataKey::AdvertiserStats(advertiser.clone());
if let Some(mut stats) = env
.storage()
.persistent()
.get::<DataKey, AdvertiserStats>(&stats_key)
{
stats.active_campaigns = stats.active_campaigns.saturating_sub(1);
env.storage().persistent().set(&stats_key, &stats);
env.storage().persistent().extend_ttl(
&stats_key,
PERSISTENT_LIFETIME_THRESHOLD,
PERSISTENT_BUMP_AMOUNT,
);
}
}
_ => {}
}

campaign.status = CampaignStatus::Paused;
campaign.last_updated = env.ledger().timestamp();
let _ttl_key = DataKey::Campaign(campaign_id);
Expand Down Expand Up @@ -461,6 +498,27 @@ impl CampaignOrchestratorContract {
panic!("unauthorized");
}

// Only increment stats if it was paused
match campaign.status {
CampaignStatus::Paused => {
let stats_key = DataKey::AdvertiserStats(advertiser.clone());
if let Some(mut stats) = env
.storage()
.persistent()
.get::<DataKey, AdvertiserStats>(&stats_key)
{
stats.active_campaigns += 1;
env.storage().persistent().set(&stats_key, &stats);
env.storage().persistent().extend_ttl(
&stats_key,
PERSISTENT_LIFETIME_THRESHOLD,
PERSISTENT_BUMP_AMOUNT,
);
}
}
_ => panic!("campaign not paused"),
}

campaign.status = CampaignStatus::Active;
campaign.last_updated = env.ledger().timestamp();
let _ttl_key = DataKey::Campaign(campaign_id);
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/hooks/useAnalytics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export function useAnalyticsTimeseries({ campaignIds, timeframe }: UseAnalyticsT
return () => {
controller.abort();
};
}, [campaignIds, timeframe]);
}, [campaignIds.join(','), timeframe]);

return { data, loading, error };
}
Loading