From 4ea51590b599094dfe10d8af283312ba55c212ec Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Thu, 30 Apr 2026 14:59:15 +0200 Subject: [PATCH 01/12] Add statement-latency-bench example --- examples/statement-latency-bench/.gitignore | 3 + examples/statement-latency-bench/README.md | 69 +++++ examples/statement-latency-bench/bench.js | 244 +++++++++++++++++ examples/statement-latency-bench/chainspec.js | 16 ++ examples/statement-latency-bench/client.js | 250 ++++++++++++++++++ examples/statement-latency-bench/keypair.js | 19 ++ examples/statement-latency-bench/package.json | 21 ++ .../statement-latency-bench/smoldot-rpc.js | 113 ++++++++ examples/statement-latency-bench/statement.js | 176 ++++++++++++ examples/statement-latency-bench/stats.js | 61 +++++ 10 files changed, 972 insertions(+) create mode 100644 examples/statement-latency-bench/.gitignore create mode 100644 examples/statement-latency-bench/README.md create mode 100755 examples/statement-latency-bench/bench.js create mode 100644 examples/statement-latency-bench/chainspec.js create mode 100644 examples/statement-latency-bench/client.js create mode 100644 examples/statement-latency-bench/keypair.js create mode 100644 examples/statement-latency-bench/package.json create mode 100644 examples/statement-latency-bench/smoldot-rpc.js create mode 100644 examples/statement-latency-bench/statement.js create mode 100644 examples/statement-latency-bench/stats.js diff --git a/examples/statement-latency-bench/.gitignore b/examples/statement-latency-bench/.gitignore new file mode 100644 index 0000000000..d8d980909e --- /dev/null +++ b/examples/statement-latency-bench/.gitignore @@ -0,0 +1,3 @@ +node_modules/ +package-lock.json +chain-specs/ diff --git a/examples/statement-latency-bench/README.md b/examples/statement-latency-bench/README.md new file mode 100644 index 0000000000..02b9d29591 --- /dev/null +++ b/examples/statement-latency-bench/README.md @@ -0,0 +1,69 @@ +# statement-latency-bench + +Smoldot-based Node.js port of `polkadot-sdk/substrate/client/statement-store/statement-latency-bench`. + +Each virtual client runs an in-process smoldot light client and joins the +statement-store libp2p network as its own peer. There is no `--rpc-endpoints`: +smoldot is a peer, not an RPC client of a node. + +## Build smoldot first + +```bash +cd ../../wasm-node/javascript +npm install && npm run build +cd - +npm install +``` + +## Usage + +```bash +node bench.js \ + --chain-spec ./chain-specs/parachain.json \ + --relay-chain-spec ./chain-specs/relay.json \ + --bootnodes /ip4/127.0.0.1/tcp/30333/ws/p2p/12D3KooW… \ + --num-clients 4 \ + --num-rounds 2 \ + --messages-pattern "3:128" \ + --receive-timeout-ms 30000 \ + --interval-ms 5000 +``` + +If `--chain-spec` is a parachain spec, pass `--relay-chain-spec` too. Bootnodes +in `--bootnodes` are appended to whatever is already in the spec's `bootNodes` +field. + +## Flags + +| Flag | Default | Notes | +|---|---|---| +| `--chain-spec` | required | Raw chain spec JSON path. | +| `--relay-chain-spec` | — | Required if the parachain spec references a relay. | +| `--bootnodes` | — | Comma-separated multiaddrs, appended to the spec. | +| `--false-positive-rate` | `0.01` | Statement-store affinity bloom filter rate. | +| `--num-clients` | `100` | Each spawns its own smoldot instance. | +| `--num-rounds` | `1` | | +| `--messages-pattern` | `5:512` | `count:size,count:size,…` | +| `--receive-timeout-ms` | `5000` | | +| `--interval-ms` | `10000` | Wall-clock pacing between rounds. | +| `--statement-expiry-ms` | `600000` | | +| `--warmup-ms` | `15000` | Fixed sleep after `addChain` before round 1. | +| `--fail-fast` | `false` | First failure aborts all clients via AbortController. | +| `--log-level` | `info` | error/warn/info/debug/trace | + +## Differences from `bench.rs` + +- **Connection model.** Rust connects to N WS endpoints; smoldot joins the + network directly. Bootnodes replace `--rpc-endpoints` as the wiring point. +- **One smoldot per client.** Smoldot dedupes within a client, so one peer per + client requires one `start()` per client. Each is ~tens of MB resident; for + N=100 expect multi-GB RAM. Use the same K8s-shard pattern as the Rust bench. +- **No barrier.** Smoldot startup is variable (5–30s) and gossip readiness is + not observable from JSON-RPC, so a barrier on a code line gives false + confidence. We use a fixed `--warmup-ms` instead and pace rounds by wall clock. +- **`--seed` removed.** Always derives sr25519 keys via `//StatementClient//${idx}`, + matching `sc_statement_store::test_utils::get_keypair`. +- **Output lines** otherwise match `bench.rs` verbatim so log parsers keep + working: `Starting Statement Store Latency Benchmark: …`, `Spawning {N} + client tasks... {testRunId}`, `Benchmark Results: send_min=…`, `Benchmark + Failed: failed_clients=…`, `Benchmark Finished: rounds_with_any_success=…`. diff --git a/examples/statement-latency-bench/bench.js b/examples/statement-latency-bench/bench.js new file mode 100755 index 0000000000..ed008bb989 --- /dev/null +++ b/examples/statement-latency-bench/bench.js @@ -0,0 +1,244 @@ +#!/usr/bin/env node +import { parseArgs } from "node:util"; +import { randomBytes } from "node:crypto"; +import { start } from "smoldot"; +import { loadChainSpec, spliceBootnodes } from "./chainspec.js"; +import { getKeypair } from "./keypair.js"; +import { SmoldotRpc } from "./smoldot-rpc.js"; +import { runClient } from "./client.js"; +import { FailureKind, fail, reportResults } from "./stats.js"; + +const LEVEL = { ERROR: 1, WARN: 2, INFO: 3, DEBUG: 4, TRACE: 5 }; +const LEVEL_LABEL = { 1: "ERROR", 2: "WARN", 3: "INFO", 4: "DEBUG", 5: "TRACE" }; + +function makeLogger(maxLevel) { + const emit = (lvl, msg) => { + if (lvl > maxLevel) return; + const stream = lvl <= LEVEL.WARN ? process.stderr : process.stdout; + stream.write(`[${LEVEL_LABEL[lvl]}] ${msg}\n`); + }; + return { + error: (msg) => emit(LEVEL.ERROR, msg), + warn: (msg) => emit(LEVEL.WARN, msg), + info: (msg) => emit(LEVEL.INFO, msg), + debug: (msg) => emit(LEVEL.DEBUG, msg), + trace: (msg) => emit(LEVEL.TRACE, msg), + forSmoldot: (lvl, target, message) => emit(lvl, `[${target}] ${message}`), + }; +} + +function parseMessagesPattern(pattern) { + return pattern.split(",").map((part) => { + const [c, s] = part.trim().split(":"); + if (c === undefined || s === undefined) { + throw new Error(`Invalid pattern '${part}'. Expected 'count:size'`); + } + const count = Number.parseInt(c, 10); + const size = Number.parseInt(s, 10); + if (!Number.isFinite(count) || !Number.isFinite(size)) { + throw new Error(`Invalid count/size in pattern '${part}'`); + } + return [count, size]; + }); +} + +function parseFlags(argv) { + const { values } = parseArgs({ + args: argv, + options: { + "chain-spec": { type: "string" }, + "relay-chain-spec": { type: "string" }, + bootnodes: { type: "string" }, + "false-positive-rate": { type: "string", default: "0.01" }, + "num-clients": { type: "string", default: "100" }, + "num-rounds": { type: "string", default: "1" }, + "messages-pattern": { type: "string", default: "5:512" }, + "receive-timeout-ms": { type: "string", default: "5000" }, + "interval-ms": { type: "string", default: "10000" }, + "statement-expiry-ms": { type: "string", default: "600000" }, + "warmup-ms": { type: "string", default: "15000" }, + "fail-fast": { type: "boolean", default: false }, + "log-level": { type: "string", default: "info" }, + }, + strict: true, + }); + + if (!values["chain-spec"]) { + throw new Error("--chain-spec is required"); + } + + const numClients = Number.parseInt(values["num-clients"], 10); + const numRounds = Number.parseInt(values["num-rounds"], 10); + if (!(numClients > 0)) throw new Error(`--num-clients must be > 0`); + if (!(numRounds > 0)) throw new Error(`--num-rounds must be > 0`); + + return { + chainSpecPath: values["chain-spec"], + relayChainSpecPath: values["relay-chain-spec"], + bootnodes: values.bootnodes ? values.bootnodes.split(",").map((b) => b.trim()).filter(Boolean) : [], + falsePositiveRate: Number.parseFloat(values["false-positive-rate"]), + numClients, + numRounds, + messagesPattern: parseMessagesPattern(values["messages-pattern"]), + receiveTimeoutMs: Number.parseInt(values["receive-timeout-ms"], 10), + intervalMs: Number.parseInt(values["interval-ms"], 10), + statementExpiryMs: Number.parseInt(values["statement-expiry-ms"], 10), + warmupMs: Number.parseInt(values["warmup-ms"], 10), + failFast: values["fail-fast"], + logLevel: LEVEL[values["log-level"].toUpperCase()] ?? LEVEL.INFO, + }; +} + +function logConfiguration(log, args) { + const pattern = args.messagesPattern.map(([c, s]) => `${c}x${s}B`).join(", "); + log.info( + `Starting Statement Store Latency Benchmark: ` + + `chain_spec=${args.chainSpecPath} ` + + (args.relayChainSpecPath ? `relay_chain_spec=${args.relayChainSpecPath} ` : "") + + `bootnodes=${args.bootnodes.length} ` + + `clients=${args.numClients} rounds=${args.numRounds} ` + + `interval=${args.intervalMs}ms pattern=[${pattern}]`, + ); +} + +async function spawnClient({ clientId, args, parachainSpec, relaySpec, log }) { + const smoldot = start({ + maxLogLevel: 3, + logCallback: (lvl, target, msg) => log.forSmoldot(lvl, target, msg), + }); + + const chains = []; + let relayChain = null; + if (relaySpec) { + relayChain = await smoldot.addChain({ chainSpec: relaySpec, disableJsonRpc: true }); + chains.push(relayChain); + } + + const parachain = await smoldot.addChain({ + chainSpec: parachainSpec, + potentialRelayChains: relayChain ? [relayChain] : [], + statementStore: { falsePositiveRate: args.falsePositiveRate }, + }); + chains.push(parachain); + + const rpc = new SmoldotRpc(parachain, { + onUnexpected: (e) => log.warn(`client ${clientId} rpc: ${e.message}`), + }); + + return { + smoldot, + chains, + rpc, + cleanup: async () => { + rpc.stop(); + try { + for (const c of chains) c.remove(); + } catch {} + try { + await smoldot.terminate(); + } catch {} + }, + }; +} + +async function main() { + let args; + try { + args = parseFlags(process.argv.slice(2)); + } catch (e) { + process.stderr.write(`Error: ${e.message}\n`); + process.exit(2); + } + + const log = makeLogger(args.logLevel); + const testRunId = randomBytes(8).readBigUInt64LE(0).toString(); + + logConfiguration(log, args); + + const parachainSpecRaw = await loadChainSpec(args.chainSpecPath); + const parachainSpec = spliceBootnodes(parachainSpecRaw, args.bootnodes); + const relaySpec = args.relayChainSpecPath ? await loadChainSpec(args.relayChainSpecPath) : null; + + log.info(`Spawning ${args.numClients} client tasks... ${testRunId}`); + + const abortController = new AbortController(); + const handles = []; + for (let clientId = 0; clientId < args.numClients; clientId++) { + handles.push((async () => { + let resources; + try { + resources = await spawnClient({ clientId, args, parachainSpec, relaySpec, log }); + } catch (e) { + return { + successes: [], + failures: [ + fail(log, clientId, null, FailureKind.TaskPanicked, `failed to start smoldot: ${e.message}`), + ], + }; + } + + try { + const pair = await getKeypair(clientId); + const config = { + clientId, + neighbourId: (clientId + 1) % args.numClients, + numClients: args.numClients, + numRounds: args.numRounds, + testRunId, + messagesPattern: args.messagesPattern, + receiveTimeoutMs: args.receiveTimeoutMs, + intervalMs: args.intervalMs, + statementExpiryMs: args.statementExpiryMs, + failFast: args.failFast, + }; + + // Best-effort warm-up: smoldot has no JSON-RPC signal for "I have a + // statement-store peer". A fixed sleep is a coarse heuristic; tune via + // --warmup-ms per network. + await new Promise((r) => setTimeout(r, args.warmupMs)); + + const result = await runClient({ + config, + rpc: resources.rpc, + pair, + abortSignal: abortController.signal, + log, + }); + + if (args.failFast && result.failures.length > 0) { + abortController.abort(); + } + + return result; + } catch (e) { + return { + successes: [], + failures: [ + fail(log, clientId, null, FailureKind.TaskPanicked, e?.message ?? String(e)), + ], + }; + } finally { + await resources.cleanup(); + } + })()); + } + + const results = await Promise.all(handles); + const allSuccesses = []; + const allFailures = []; + for (const r of results) { + allSuccesses.push(...r.successes); + allFailures.push(...r.failures); + } + + reportResults(log, allSuccesses, allFailures, args.numClients, args.numRounds); + + if (allFailures.length > 0 && allSuccesses.length === 0) { + process.exit(1); + } +} + +main().catch((e) => { + process.stderr.write(`fatal: ${e?.stack ?? e}\n`); + process.exit(1); +}); diff --git a/examples/statement-latency-bench/chainspec.js b/examples/statement-latency-bench/chainspec.js new file mode 100644 index 0000000000..4439f94131 --- /dev/null +++ b/examples/statement-latency-bench/chainspec.js @@ -0,0 +1,16 @@ +import { readFile } from "node:fs/promises"; + +export async function loadChainSpec(path) { + return await readFile(path, "utf8"); +} + +// `AddChainOptions.chainSpec` has no separate bootnodes field; bootnodes must +// be embedded in the spec JSON. We append rather than replace so any bootnodes +// already in the spec (e.g. public ones) keep working alongside CLI-provided ones. +export function spliceBootnodes(specJson, bootnodes) { + if (!bootnodes?.length) return specJson; + const spec = JSON.parse(specJson); + const existing = Array.isArray(spec.bootNodes) ? spec.bootNodes : []; + spec.bootNodes = [...existing, ...bootnodes]; + return JSON.stringify(spec); +} diff --git a/examples/statement-latency-bench/client.js b/examples/statement-latency-bench/client.js new file mode 100644 index 0000000000..735ccbd4c7 --- /dev/null +++ b/examples/statement-latency-bench/client.js @@ -0,0 +1,250 @@ +import { blake2AsU8a } from "@polkadot/util-crypto"; +import { u8aToHex } from "@polkadot/util"; +import { encodeStatement, expiryFromParts } from "./statement.js"; +import { FailureKind, fail } from "./stats.js"; + +const MAX_TOPICS = 128; // bench.rs:249 — BoundedVec> on the subscribe side + +const enc = new TextEncoder(); + +function generateTopic(testRunId, clientId, round, msgIdx) { + const s = `${testRunId}-${clientId}-${round}-${msgIdx}`; + return blake2AsU8a(enc.encode(s), 256); +} + +function u32LeBytes(n) { + const out = new Uint8Array(4); + new DataView(out.buffer).setUint32(0, n >>> 0, true); + return out; +} + +function messagesPerClient(pattern) { + return pattern.reduce((sum, [count]) => sum + count, 0); +} + +function isLeader(clientId) { + return clientId === 0; +} + +function sleep(ms, signal) { + return new Promise((resolve, reject) => { + if (signal?.aborted) return reject(new Error("aborted")); + const t = setTimeout(resolve, ms); + signal?.addEventListener( + "abort", + () => { + clearTimeout(t); + reject(new Error("aborted")); + }, + { once: true }, + ); + }); +} + +function withTimeout(promise, ms) { + let timer; + const timeout = new Promise((_, reject) => { + timer = setTimeout(() => reject(new Error("timeout")), ms); + }); + return Promise.race([promise.then((v) => v), timeout]).finally(() => + clearTimeout(timer), + ); +} + +async function executeRound({ round, config, rpc, pair, log }) { + const { + clientId, + neighbourId, + numRounds, + testRunId, + messagesPattern, + receiveTimeoutMs, + statementExpiryMs, + } = config; + + const expectedCount = messagesPerClient(messagesPattern); + if (expectedCount > MAX_TOPICS) { + return fail( + log, + clientId, + [round, numRounds], + FailureKind.TooManyTopics, + `max ${MAX_TOPICS}, got ${expectedCount}`, + ); + } + + const roundStart = performance.now(); + + const expectedTopics = []; + for (let idx = 0; idx < expectedCount; idx++) { + expectedTopics.push(u8aToHex(generateTopic(testRunId, neighbourId, round, idx))); + } + + let subscription; + try { + subscription = await rpc.subscribe("statement_subscribeStatement", [ + { matchAny: expectedTopics }, + ]); + } catch (e) { + return fail( + log, + clientId, + [round, numRounds], + FailureKind.SubscribeFailed, + e.message, + ); + } + + let sentCount = 0; + try { + for (const [count, size] of messagesPattern) { + for (let i = 0; i < count; i++) { + const topic = generateTopic(testRunId, clientId, round, sentCount); + const channel = blake2AsU8a(u32LeBytes(sentCount), 256); + const expiryTs = Math.floor((Date.now() + statementExpiryMs) / 1000); + const sequence = (sentCount + 1) * round; + const expiry = expiryFromParts(expiryTs, sequence); + const data = new Uint8Array(size); // zero-filled; bench.rs:283 does the same + + const hex = encodeStatement({ pair, expiry, channel, topic, data }); + + let result; + try { + result = await rpc.request("statement_submit", [hex]); + } catch (e) { + return fail( + log, + clientId, + [round, numRounds], + FailureKind.SubmitFailed, + e.message, + ); + } + + sentCount += 1; + if (isLeader(clientId)) { + log.debug( + `Round ${round}/${numRounds}. Sent ${sentCount} statement(s): ${JSON.stringify(result)}`, + ); + } + } + } + + const sendDuration = (performance.now() - roundStart) / 1000; + + let receivedCount = 0; + while (receivedCount < expectedCount) { + let payload; + try { + payload = await withTimeout(subscription.next(), receiveTimeoutMs); + } catch (e) { + return fail( + log, + clientId, + [round, numRounds], + FailureKind.PropagationTimeout, + `received ${receivedCount}/${expectedCount} after ${receiveTimeoutMs}ms`, + ); + } + + if (payload === null) { + return fail( + log, + clientId, + [round, numRounds], + FailureKind.SubscriptionClosed, + `received ${receivedCount}/${expectedCount}`, + ); + } + + if (payload?.event === "newStatements" && Array.isArray(payload.data?.statements)) { + const batch = payload.data.statements.length; + receivedCount += batch; + if (isLeader(clientId)) { + log.debug( + `Round ${round}/${numRounds}. Received ${receivedCount} statement(s) (batch of ${batch})`, + ); + } + } else { + return fail( + log, + clientId, + [round, numRounds], + FailureKind.SubscriptionStreamError, + `received ${receivedCount}/${expectedCount}, unexpected payload: ${JSON.stringify(payload)}`, + ); + } + } + + const fullLatency = (performance.now() - roundStart) / 1000; + const receiveDuration = fullLatency - sendDuration; + + if (isLeader(clientId)) { + log.debug( + `Round ${round}/${numRounds} complete. ` + + `Send: ${sendDuration.toFixed(3)}s, ` + + `Receive: ${receiveDuration.toFixed(3)}s, ` + + `Total: ${fullLatency.toFixed(3)}s`, + ); + } + + return { + round, + sent_count: sentCount, + received_count: receivedCount, + send_duration_secs: sendDuration, + receive_duration_secs: receiveDuration, + full_latency_secs: fullLatency, + }; + } finally { + try { + await rpc.unsubscribe("statement_unsubscribeStatement", subscription.id); + } catch { + // best-effort; if the subscription is already gone we don't care + } + } +} + +export async function runClient({ config, rpc, pair, abortSignal, log }) { + const successes = []; + const failures = []; + + // Apply jitter to distribute submission load (bench.rs:423-424). + const submissionJitter = (config.clientId * 7) % 1000; + await sleep(submissionJitter, abortSignal).catch(() => {}); + + for (let round = 1; round <= config.numRounds; round++) { + if (abortSignal?.aborted) { + failures.push(FailureKind.PeerFailed); + break; + } + + const roundStart = performance.now(); + const result = await executeRound({ round, config, rpc, pair, log }); + + if (typeof result === "string") { + failures.push(result); + if (config.failFast) break; + } else { + successes.push(result); + } + + if (round < config.numRounds) { + const elapsedMs = performance.now() - roundStart; + if (elapsedMs < config.intervalMs) { + try { + await sleep(config.intervalMs - elapsedMs, abortSignal); + } catch { + failures.push(FailureKind.PeerFailed); + break; + } + } else if (isLeader(config.clientId)) { + log.warn( + `Client ${config.clientId}: Round ${round} took longer (${Math.round(elapsedMs)}ms) than target (${config.intervalMs}ms)`, + ); + } + } + } + + return { successes, failures }; +} diff --git a/examples/statement-latency-bench/keypair.js b/examples/statement-latency-bench/keypair.js new file mode 100644 index 0000000000..3b1ac25f0a --- /dev/null +++ b/examples/statement-latency-bench/keypair.js @@ -0,0 +1,19 @@ +import { Keyring } from "@polkadot/keyring"; +import { cryptoWaitReady } from "@polkadot/util-crypto"; + +let keyring = null; + +async function getKeyring() { + if (!keyring) { + await cryptoWaitReady(); + keyring = new Keyring({ type: "sr25519" }); + } + return keyring; +} + +// Mirrors sc_statement_store::test_utils::get_keypair (substrate), +// which derives `//StatementClient//{idx}`. +export async function getKeypair(idx) { + const ring = await getKeyring(); + return ring.createFromUri(`//StatementClient//${idx}`); +} diff --git a/examples/statement-latency-bench/package.json b/examples/statement-latency-bench/package.json new file mode 100644 index 0000000000..ae7cc919d9 --- /dev/null +++ b/examples/statement-latency-bench/package.json @@ -0,0 +1,21 @@ +{ + "name": "statement-latency-bench", + "version": "0.1.0", + "private": true, + "type": "module", + "bin": { + "statement-latency-bench": "./bench.js" + }, + "scripts": { + "start": "node bench.js" + }, + "dependencies": { + "@polkadot/keyring": "^13.0.2", + "@polkadot/util": "^13.0.2", + "@polkadot/util-crypto": "^13.0.2", + "smoldot": "file:../../wasm-node/javascript" + }, + "engines": { + "node": ">=20" + } +} diff --git a/examples/statement-latency-bench/smoldot-rpc.js b/examples/statement-latency-bench/smoldot-rpc.js new file mode 100644 index 0000000000..8b49f4d919 --- /dev/null +++ b/examples/statement-latency-bench/smoldot-rpc.js @@ -0,0 +1,113 @@ +// JSON-RPC plumbing for a smoldot Chain. Maintains: +// - request-id counter and pendingRequests Map for request/response correlation +// - per-subscription queues for notifications +// - a single background loop pulling from chain.nextJsonRpcResponse() +// +// Adapted from examples/statement-chat/main.js (PR #3143). +export class SmoldotRpc { + constructor(chain, { onUnexpected } = {}) { + this.chain = chain; + this.nextId = 1; + this.pending = new Map(); + this.subs = new Map(); // subscriptionId -> { queue: [], waiters: [], closed } + this.onUnexpected = onUnexpected ?? (() => {}); + this.stopped = false; + this.loop = this.#run(); + } + + async request(method, params) { + const id = String(this.nextId++); + const body = JSON.stringify({ jsonrpc: "2.0", id, method, params }); + const promise = new Promise((resolve, reject) => { + this.pending.set(id, { resolve, reject }); + }); + this.chain.sendJsonRpc(body); + return promise; + } + + // Subscribe and return { id, next: () => Promise }. + // `next()` resolves with the next notification payload, or `null` once the + // subscription is closed. + async subscribe(subscribeMethod, params) { + const id = await this.request(subscribeMethod, params); + const sub = { queue: [], waiters: [], closed: false }; + this.subs.set(id, sub); + return { + id, + next: () => { + if (sub.queue.length > 0) return Promise.resolve(sub.queue.shift()); + if (sub.closed) return Promise.resolve(null); + return new Promise((resolve) => sub.waiters.push(resolve)); + }, + close: () => this.#closeSub(id), + }; + } + + async unsubscribe(unsubscribeMethod, subscriptionId) { + try { + await this.request(unsubscribeMethod, [subscriptionId]); + } finally { + this.#closeSub(subscriptionId); + } + } + + #closeSub(id) { + const sub = this.subs.get(id); + if (!sub || sub.closed) return; + sub.closed = true; + for (const w of sub.waiters) w(null); + sub.waiters.length = 0; + this.subs.delete(id); + } + + stop() { + this.stopped = true; + for (const { reject } of this.pending.values()) { + reject(new Error("RPC client stopped")); + } + this.pending.clear(); + for (const id of [...this.subs.keys()]) this.#closeSub(id); + } + + async #run() { + while (!this.stopped) { + let raw; + try { + raw = await this.chain.nextJsonRpcResponse(); + } catch (e) { + if (!this.stopped) this.onUnexpected(e); + return; + } + let msg; + try { + msg = JSON.parse(raw); + } catch (e) { + this.onUnexpected(new Error(`bad JSON-RPC payload: ${e.message}`)); + continue; + } + + if (msg.id !== undefined) { + const pending = this.pending.get(msg.id); + if (!pending) { + this.onUnexpected(new Error(`response for unknown id ${msg.id}`)); + continue; + } + this.pending.delete(msg.id); + if (msg.error) pending.reject(new Error(`JSON-RPC error: ${msg.error.message}`)); + else pending.resolve(msg.result); + continue; + } + + if (msg.method && msg.params?.subscription !== undefined) { + const sub = this.subs.get(msg.params.subscription); + if (!sub) continue; // stray notification (likely after unsubscribe) + const payload = msg.params.result; + if (sub.waiters.length > 0) sub.waiters.shift()(payload); + else sub.queue.push(payload); + continue; + } + + this.onUnexpected(new Error(`unrecognized JSON-RPC message: ${raw}`)); + } + } +} diff --git a/examples/statement-latency-bench/statement.js b/examples/statement-latency-bench/statement.js new file mode 100644 index 0000000000..74fe888775 --- /dev/null +++ b/examples/statement-latency-bench/statement.js @@ -0,0 +1,176 @@ +import { compactToU8a, compactFromU8a, u8aConcat, u8aToHex, hexToU8a } from "@polkadot/util"; + +// Field discriminants (lib/src/network/codec/statement.rs:35-46). +const FIELD_PROOF = 0; +const FIELD_DECRYPTION_KEY = 1; +const FIELD_EXPIRY = 2; +const FIELD_CHANNEL = 3; +const FIELD_TOPIC_START = 4; +const FIELD_TOPIC_END = 7; +const FIELD_DATA = 8; + +const PROOF_SR25519 = 0; +const PROOF_ED25519 = 1; +const PROOF_SECP256K1_ECDSA = 2; +const PROOF_ON_CHAIN = 3; + +function u64ToLeBytes(n) { + const out = new Uint8Array(8); + const view = new DataView(out.buffer); + view.setBigUint64(0, BigInt(n), true); + return out; +} + +function leBytesToU64(bytes) { + return new DataView(bytes.buffer, bytes.byteOffset, 8).getBigUint64(0, true); +} + +// Pack (timestamp_secs, sequence) into the u64 expiry per +// sp_statement_store::Statement::set_expiry_from_parts: +// expiry = (timestamp_secs as u64) << 32 | sequence as u64 +export function expiryFromParts(timestampSecs, sequence) { + return (BigInt(timestampSecs) << 32n) | BigInt(sequence); +} + +// Build the bytes signed over by the proof, mirroring +// sp_statement_store::Statement::encoded(for_signing=true) in polkadot-sdk +// (substrate/primitives/statement-store/src/lib.rs:736-780): no leading +// field-count compact, no proof field, everything else in ascending discriminant +// order. +function buildSignatureMaterial({ expiry, channel, topic, data }) { + const parts = []; + + parts.push(new Uint8Array([FIELD_EXPIRY])); + parts.push(u64ToLeBytes(expiry)); + + if (channel) { + parts.push(new Uint8Array([FIELD_CHANNEL])); + parts.push(channel); + } + + parts.push(new Uint8Array([FIELD_TOPIC_START])); + parts.push(topic); + + parts.push(new Uint8Array([FIELD_DATA])); + parts.push(compactToU8a(data.length)); + parts.push(data); + + return u8aConcat(...parts); +} + +// Encode a full sr25519-signed statement on the wire. +// +// Wire format (matches both lib/src/network/codec/statement.rs:269-325 and +// sp_statement_store::Statement::encoded(false)): SCALE-compact field count, +// then ascending (discriminant, value) pairs. +export function encodeStatement({ pair, expiry, channel, topic, data }) { + const material = buildSignatureMaterial({ expiry, channel, topic, data }); + const signature = pair.sign(material); + const signer = pair.publicKey; + + let numFields = 1; // proof + numFields += 1; // expiry (always present) + if (channel) numFields += 1; + numFields += 1; // one topic + numFields += 1; // data + + const parts = []; + parts.push(compactToU8a(numFields)); + + parts.push(new Uint8Array([FIELD_PROOF, PROOF_SR25519])); + parts.push(signature); + parts.push(signer); + + parts.push(new Uint8Array([FIELD_EXPIRY])); + parts.push(u64ToLeBytes(expiry)); + + if (channel) { + parts.push(new Uint8Array([FIELD_CHANNEL])); + parts.push(channel); + } + + parts.push(new Uint8Array([FIELD_TOPIC_START])); + parts.push(topic); + + parts.push(new Uint8Array([FIELD_DATA])); + parts.push(compactToU8a(data.length)); + parts.push(data); + + return u8aToHex(u8aConcat(...parts)); +} + +// Best-effort decode for inspection / sanity checks. Skips fields we don't need +// in detail; pulls out proof signer, topic(s), and data. +export function decodeStatement(hex) { + const bytes = hexToU8a(hex); + let offset = 0; + + const [fieldCountSize, numFieldsBn] = compactFromU8a(bytes.subarray(offset)); + offset += fieldCountSize; + const numFields = numFieldsBn.toNumber(); + + let proof = null; + const topics = []; + let data = null; + let channel = null; + let expiry = null; + + for (let i = 0; i < numFields; i++) { + const disc = bytes[offset++]; + switch (disc) { + case FIELD_PROOF: { + const proofType = bytes[offset++]; + if (proofType === PROOF_SR25519 || proofType === PROOF_ED25519) { + const signature = bytes.slice(offset, offset + 64); + offset += 64; + const signer = bytes.slice(offset, offset + 32); + offset += 32; + proof = { + type: proofType === PROOF_SR25519 ? "Sr25519" : "Ed25519", + signature, + signer, + }; + } else if (proofType === PROOF_SECP256K1_ECDSA) { + offset += 65 + 33; + proof = { type: "Secp256k1Ecdsa" }; + } else if (proofType === PROOF_ON_CHAIN) { + offset += 32 + 32 + 8; + proof = { type: "OnChain" }; + } else { + throw new Error(`Unknown proof type: ${proofType}`); + } + break; + } + case FIELD_DECRYPTION_KEY: + offset += 32; + break; + case FIELD_EXPIRY: + expiry = leBytesToU64(bytes.slice(offset, offset + 8)); + offset += 8; + break; + case FIELD_CHANNEL: + channel = bytes.slice(offset, offset + 32); + offset += 32; + break; + case FIELD_TOPIC_START: + case FIELD_TOPIC_START + 1: + case FIELD_TOPIC_START + 2: + case FIELD_TOPIC_END: + topics.push(bytes.slice(offset, offset + 32)); + offset += 32; + break; + case FIELD_DATA: { + const [lenSize, lenBn] = compactFromU8a(bytes.subarray(offset)); + const len = lenBn.toNumber(); + offset += lenSize; + data = bytes.slice(offset, offset + len); + offset += len; + break; + } + default: + throw new Error(`Unknown field discriminant: ${disc}`); + } + } + + return { proof, expiry, channel, topics, data }; +} diff --git a/examples/statement-latency-bench/stats.js b/examples/statement-latency-bench/stats.js new file mode 100644 index 0000000000..18a24a0284 --- /dev/null +++ b/examples/statement-latency-bench/stats.js @@ -0,0 +1,61 @@ +// Closed set of failure categories. Mirrors bench.rs:118-129. +export const FailureKind = { + TooManyTopics: "Too many topics", + SubscribeFailed: "Failed to open RPC subscription", + SubmitFailed: "Failed to submit statement via RPC", + PropagationTimeout: "Statement propagation timeout", + SubscriptionClosed: "Subscription closed by server", + SubscriptionStreamError: "Subscription stream error", + PeerFailed: "Peer failed; stopping early", + TaskPanicked: "Task panicked", +}; + +// Logs the failure with bench.rs's wording and returns the kind so callers can +// `return fail(...)`. Mirrors bench.rs:148-161. +export function fail(log, clientId, roundInfo, kind, detail) { + const prefix = roundInfo ? `Round ${roundInfo[0]}/${roundInfo[1]}: ` : ""; + log.warn(`Client ${clientId}: ${prefix}${kind} (${detail})`); + return kind; +} + +function calcStats(values) { + const arr = Array.from(values); + const min = arr.reduce((a, b) => Math.min(a, b), Infinity); + const max = arr.reduce((a, b) => Math.max(a, b), -Infinity); + const avg = arr.reduce((a, b) => a + b, 0) / arr.length; + return { min, avg, max }; +} + +function f3(n) { + return n.toFixed(3); +} + +export function reportResults(log, successes, failures, numClients, numRounds) { + if (failures.length > 0) { + const counts = new Map(); + for (const k of failures) counts.set(k, (counts.get(k) ?? 0) + 1); + const sorted = [...counts.entries()].sort((a, b) => b[1] - a[1]); + const errorsStr = sorted.map(([k, c]) => `${k} (${c})`).join("; "); + log.warn( + `Benchmark Failed: failed_clients=${failures.length} total_clients=${numClients} errors=[${errorsStr}]`, + ); + } + + if (successes.length > 0) { + const send = calcStats(successes.map((s) => s.send_duration_secs)); + const receive = calcStats(successes.map((s) => s.receive_duration_secs)); + const latency = calcStats(successes.map((s) => s.full_latency_secs)); + log.info( + `Benchmark Results: ` + + `send_min=${f3(send.min)}s send_avg=${f3(send.avg)}s send_max=${f3(send.max)}s ` + + `receive_min=${f3(receive.min)}s receive_avg=${f3(receive.avg)}s receive_max=${f3(receive.max)}s ` + + `latency_min=${f3(latency.min)}s latency_avg=${f3(latency.avg)}s latency_max=${f3(latency.max)}s`, + ); + } + + const roundsWithAnySuccess = new Set(successes.map((s) => s.round)).size; + log.info( + `Benchmark Finished: rounds_with_any_success=${roundsWithAnySuccess} ` + + `total_rounds=${numRounds} total_clients=${numClients}`, + ); +} From 7236b528db476d4d3bdf9b017590399f892ae5c8 Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Thu, 30 Apr 2026 15:14:45 +0200 Subject: [PATCH 02/12] Require relay/parachain spec flags; rename --chain-spec to --parachain-spec --- examples/statement-latency-bench/README.md | 12 ++++----- examples/statement-latency-bench/bench.js | 31 ++++++++++------------ 2 files changed, 20 insertions(+), 23 deletions(-) diff --git a/examples/statement-latency-bench/README.md b/examples/statement-latency-bench/README.md index 02b9d29591..77dcbd13c5 100644 --- a/examples/statement-latency-bench/README.md +++ b/examples/statement-latency-bench/README.md @@ -19,7 +19,7 @@ npm install ```bash node bench.js \ - --chain-spec ./chain-specs/parachain.json \ + --parachain-spec ./chain-specs/parachain.json \ --relay-chain-spec ./chain-specs/relay.json \ --bootnodes /ip4/127.0.0.1/tcp/30333/ws/p2p/12D3KooW… \ --num-clients 4 \ @@ -29,16 +29,16 @@ node bench.js \ --interval-ms 5000 ``` -If `--chain-spec` is a parachain spec, pass `--relay-chain-spec` too. Bootnodes -in `--bootnodes` are appended to whatever is already in the spec's `bootNodes` -field. +Both `--parachain-spec` (parachain) and `--relay-chain-spec` are required. +Bootnodes in `--bootnodes` are appended to whatever is already in the +parachain spec's `bootNodes` field. ## Flags | Flag | Default | Notes | |---|---|---| -| `--chain-spec` | required | Raw chain spec JSON path. | -| `--relay-chain-spec` | — | Required if the parachain spec references a relay. | +| `--parachain-spec` | required | Parachain raw chain spec JSON path. | +| `--relay-chain-spec` | required | Relay chain raw chain spec JSON path. | | `--bootnodes` | — | Comma-separated multiaddrs, appended to the spec. | | `--false-positive-rate` | `0.01` | Statement-store affinity bloom filter rate. | | `--num-clients` | `100` | Each spawns its own smoldot instance. | diff --git a/examples/statement-latency-bench/bench.js b/examples/statement-latency-bench/bench.js index ed008bb989..6592143ff7 100755 --- a/examples/statement-latency-bench/bench.js +++ b/examples/statement-latency-bench/bench.js @@ -46,7 +46,7 @@ function parseFlags(argv) { const { values } = parseArgs({ args: argv, options: { - "chain-spec": { type: "string" }, + "parachain-spec": { type: "string" }, "relay-chain-spec": { type: "string" }, bootnodes: { type: "string" }, "false-positive-rate": { type: "string", default: "0.01" }, @@ -63,8 +63,11 @@ function parseFlags(argv) { strict: true, }); - if (!values["chain-spec"]) { - throw new Error("--chain-spec is required"); + if (!values["parachain-spec"]) { + throw new Error("--parachain-spec is required"); + } + if (!values["relay-chain-spec"]) { + throw new Error("--relay-chain-spec is required"); } const numClients = Number.parseInt(values["num-clients"], 10); @@ -73,7 +76,7 @@ function parseFlags(argv) { if (!(numRounds > 0)) throw new Error(`--num-rounds must be > 0`); return { - chainSpecPath: values["chain-spec"], + parachainSpecPath: values["parachain-spec"], relayChainSpecPath: values["relay-chain-spec"], bootnodes: values.bootnodes ? values.bootnodes.split(",").map((b) => b.trim()).filter(Boolean) : [], falsePositiveRate: Number.parseFloat(values["false-positive-rate"]), @@ -93,8 +96,8 @@ function logConfiguration(log, args) { const pattern = args.messagesPattern.map(([c, s]) => `${c}x${s}B`).join(", "); log.info( `Starting Statement Store Latency Benchmark: ` + - `chain_spec=${args.chainSpecPath} ` + - (args.relayChainSpecPath ? `relay_chain_spec=${args.relayChainSpecPath} ` : "") + + `parachain_spec=${args.parachainSpecPath} ` + + `relay_chain_spec=${args.relayChainSpecPath} ` + `bootnodes=${args.bootnodes.length} ` + `clients=${args.numClients} rounds=${args.numRounds} ` + `interval=${args.intervalMs}ms pattern=[${pattern}]`, @@ -107,19 +110,13 @@ async function spawnClient({ clientId, args, parachainSpec, relaySpec, log }) { logCallback: (lvl, target, msg) => log.forSmoldot(lvl, target, msg), }); - const chains = []; - let relayChain = null; - if (relaySpec) { - relayChain = await smoldot.addChain({ chainSpec: relaySpec, disableJsonRpc: true }); - chains.push(relayChain); - } - + const relayChain = await smoldot.addChain({ chainSpec: relaySpec, disableJsonRpc: true }); const parachain = await smoldot.addChain({ chainSpec: parachainSpec, - potentialRelayChains: relayChain ? [relayChain] : [], + potentialRelayChains: [relayChain], statementStore: { falsePositiveRate: args.falsePositiveRate }, }); - chains.push(parachain); + const chains = [relayChain, parachain]; const rpc = new SmoldotRpc(parachain, { onUnexpected: (e) => log.warn(`client ${clientId} rpc: ${e.message}`), @@ -155,9 +152,9 @@ async function main() { logConfiguration(log, args); - const parachainSpecRaw = await loadChainSpec(args.chainSpecPath); + const parachainSpecRaw = await loadChainSpec(args.parachainSpecPath); const parachainSpec = spliceBootnodes(parachainSpecRaw, args.bootnodes); - const relaySpec = args.relayChainSpecPath ? await loadChainSpec(args.relayChainSpecPath) : null; + const relaySpec = await loadChainSpec(args.relayChainSpecPath); log.info(`Spawning ${args.numClients} client tasks... ${testRunId}`); From 22648b7fa055719f66f4be971a0f0ffda7fc96c3 Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Thu, 30 Apr 2026 15:31:14 +0200 Subject: [PATCH 03/12] Accept URL or path for chain specs; drop --bootnodes --- examples/statement-latency-bench/README.md | 40 ++++++++++++------- examples/statement-latency-bench/bench.js | 20 +++++----- examples/statement-latency-bench/chainspec.js | 26 ++++++------ 3 files changed, 47 insertions(+), 39 deletions(-) diff --git a/examples/statement-latency-bench/README.md b/examples/statement-latency-bench/README.md index 77dcbd13c5..13fe736b0b 100644 --- a/examples/statement-latency-bench/README.md +++ b/examples/statement-latency-bench/README.md @@ -17,29 +17,38 @@ npm install ## Usage +`--parachain-spec` and `--relay-chain-spec` each accept either a local file +path or an `http(s)://` URL. Bootnodes are expected to be embedded in the +spec; the canonical source is the [paritytech/chainspecs](https://github.com/paritytech/chainspecs) +repo, and smoldot bundles popular ones under `../../demo-chain-specs/`. + ```bash +# bundled spec from the smoldot repo +node bench.js \ + --parachain-spec ../../demo-chain-specs/polkadot_asset_hub.json \ + --relay-chain-spec ../../demo-chain-specs/polkadot.json \ + --num-clients 4 --num-rounds 2 --messages-pattern "3:128" \ + --receive-timeout-ms 30000 --interval-ms 5000 + +# raw URL straight from polkadot-sdk (paritytech/chainspecs is a symlink-only +# index repo; raw.githubusercontent doesn't follow its symlinks across submodules, +# so point at the underlying files directly) node bench.js \ - --parachain-spec ./chain-specs/parachain.json \ - --relay-chain-spec ./chain-specs/relay.json \ - --bootnodes /ip4/127.0.0.1/tcp/30333/ws/p2p/12D3KooW… \ - --num-clients 4 \ - --num-rounds 2 \ - --messages-pattern "3:128" \ - --receive-timeout-ms 30000 \ - --interval-ms 5000 + --parachain-spec https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/cumulus/parachains/chain-specs/asset-hub-polkadot.json \ + --relay-chain-spec https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/polkadot/node/service/chain-specs/polkadot.json \ + --num-clients 4 --num-rounds 2 --messages-pattern "3:128" ``` -Both `--parachain-spec` (parachain) and `--relay-chain-spec` are required. -Bootnodes in `--bootnodes` are appended to whatever is already in the -parachain spec's `bootNodes` field. +For a local dev network, splice your bootnode into the parachain spec once +(e.g. with `jq`, like `examples/statement-chat/dev.sh`) and pass the result +as `--parachain-spec`. ## Flags | Flag | Default | Notes | |---|---|---| -| `--parachain-spec` | required | Parachain raw chain spec JSON path. | -| `--relay-chain-spec` | required | Relay chain raw chain spec JSON path. | -| `--bootnodes` | — | Comma-separated multiaddrs, appended to the spec. | +| `--parachain-spec` | required | Parachain raw chain spec — file path or `http(s)://` URL. | +| `--relay-chain-spec` | required | Relay chain raw chain spec — file path or `http(s)://` URL. | | `--false-positive-rate` | `0.01` | Statement-store affinity bloom filter rate. | | `--num-clients` | `100` | Each spawns its own smoldot instance. | | `--num-rounds` | `1` | | @@ -54,7 +63,8 @@ parachain spec's `bootNodes` field. ## Differences from `bench.rs` - **Connection model.** Rust connects to N WS endpoints; smoldot joins the - network directly. Bootnodes replace `--rpc-endpoints` as the wiring point. + network directly. Chain specs (with embedded bootnodes) replace + `--rpc-endpoints` as the wiring point. - **One smoldot per client.** Smoldot dedupes within a client, so one peer per client requires one `start()` per client. Each is ~tens of MB resident; for N=100 expect multi-GB RAM. Use the same K8s-shard pattern as the Rust bench. diff --git a/examples/statement-latency-bench/bench.js b/examples/statement-latency-bench/bench.js index 6592143ff7..0f8927dba9 100755 --- a/examples/statement-latency-bench/bench.js +++ b/examples/statement-latency-bench/bench.js @@ -2,7 +2,7 @@ import { parseArgs } from "node:util"; import { randomBytes } from "node:crypto"; import { start } from "smoldot"; -import { loadChainSpec, spliceBootnodes } from "./chainspec.js"; +import { loadChainSpec } from "./chainspec.js"; import { getKeypair } from "./keypair.js"; import { SmoldotRpc } from "./smoldot-rpc.js"; import { runClient } from "./client.js"; @@ -48,7 +48,6 @@ function parseFlags(argv) { options: { "parachain-spec": { type: "string" }, "relay-chain-spec": { type: "string" }, - bootnodes: { type: "string" }, "false-positive-rate": { type: "string", default: "0.01" }, "num-clients": { type: "string", default: "100" }, "num-rounds": { type: "string", default: "1" }, @@ -76,9 +75,8 @@ function parseFlags(argv) { if (!(numRounds > 0)) throw new Error(`--num-rounds must be > 0`); return { - parachainSpecPath: values["parachain-spec"], - relayChainSpecPath: values["relay-chain-spec"], - bootnodes: values.bootnodes ? values.bootnodes.split(",").map((b) => b.trim()).filter(Boolean) : [], + parachainSpecSource: values["parachain-spec"], + relayChainSpecSource: values["relay-chain-spec"], falsePositiveRate: Number.parseFloat(values["false-positive-rate"]), numClients, numRounds, @@ -96,9 +94,8 @@ function logConfiguration(log, args) { const pattern = args.messagesPattern.map(([c, s]) => `${c}x${s}B`).join(", "); log.info( `Starting Statement Store Latency Benchmark: ` + - `parachain_spec=${args.parachainSpecPath} ` + - `relay_chain_spec=${args.relayChainSpecPath} ` + - `bootnodes=${args.bootnodes.length} ` + + `parachain_spec=${args.parachainSpecSource} ` + + `relay_chain_spec=${args.relayChainSpecSource} ` + `clients=${args.numClients} rounds=${args.numRounds} ` + `interval=${args.intervalMs}ms pattern=[${pattern}]`, ); @@ -152,9 +149,10 @@ async function main() { logConfiguration(log, args); - const parachainSpecRaw = await loadChainSpec(args.parachainSpecPath); - const parachainSpec = spliceBootnodes(parachainSpecRaw, args.bootnodes); - const relaySpec = await loadChainSpec(args.relayChainSpecPath); + const [parachainSpec, relaySpec] = await Promise.all([ + loadChainSpec(args.parachainSpecSource), + loadChainSpec(args.relayChainSpecSource), + ]); log.info(`Spawning ${args.numClients} client tasks... ${testRunId}`); diff --git a/examples/statement-latency-bench/chainspec.js b/examples/statement-latency-bench/chainspec.js index 4439f94131..d69d9b66be 100644 --- a/examples/statement-latency-bench/chainspec.js +++ b/examples/statement-latency-bench/chainspec.js @@ -1,16 +1,16 @@ import { readFile } from "node:fs/promises"; -export async function loadChainSpec(path) { - return await readFile(path, "utf8"); -} - -// `AddChainOptions.chainSpec` has no separate bootnodes field; bootnodes must -// be embedded in the spec JSON. We append rather than replace so any bootnodes -// already in the spec (e.g. public ones) keep working alongside CLI-provided ones. -export function spliceBootnodes(specJson, bootnodes) { - if (!bootnodes?.length) return specJson; - const spec = JSON.parse(specJson); - const existing = Array.isArray(spec.bootNodes) ? spec.bootNodes : []; - spec.bootNodes = [...existing, ...bootnodes]; - return JSON.stringify(spec); +// Accepts an http(s) URL or a local file path. Bootnodes are expected to be +// embedded in the spec already (the canonical source is the paritytech/chainspecs +// repo and smoldot's bundled demo-chain-specs/, both of which ship bootnodes +// inside the spec). +export async function loadChainSpec(source) { + if (/^https?:\/\//i.test(source)) { + const res = await fetch(source); + if (!res.ok) { + throw new Error(`Failed to fetch ${source}: ${res.status} ${res.statusText}`); + } + return await res.text(); + } + return await readFile(source, "utf8"); } From a62d88a676728bfe96e7c10909ec21da5f40a1f7 Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Mon, 4 May 2026 16:19:06 +0200 Subject: [PATCH 04/12] Add run-local.sh, fork per-worker process, rename --shards to --workers --- examples/statement-latency-bench/.gitignore | 1 + examples/statement-latency-bench/README.md | 5 +- examples/statement-latency-bench/bench.js | 213 +++++++++++++++--- examples/statement-latency-bench/run-local.sh | 167 ++++++++++++++ 4 files changed, 357 insertions(+), 29 deletions(-) create mode 100755 examples/statement-latency-bench/run-local.sh diff --git a/examples/statement-latency-bench/.gitignore b/examples/statement-latency-bench/.gitignore index d8d980909e..2a18eb39e4 100644 --- a/examples/statement-latency-bench/.gitignore +++ b/examples/statement-latency-bench/.gitignore @@ -1,3 +1,4 @@ node_modules/ package-lock.json chain-specs/ +*.log diff --git a/examples/statement-latency-bench/README.md b/examples/statement-latency-bench/README.md index 13fe736b0b..aaede5eb2e 100644 --- a/examples/statement-latency-bench/README.md +++ b/examples/statement-latency-bench/README.md @@ -51,6 +51,7 @@ as `--parachain-spec`. | `--relay-chain-spec` | required | Relay chain raw chain spec — file path or `http(s)://` URL. | | `--false-positive-rate` | `0.01` | Statement-store affinity bloom filter rate. | | `--num-clients` | `100` | Each spawns its own smoldot instance. | +| `--workers` | `1` | If >1, fork that many child processes; clients distributed evenly. Use to scale past one event loop's capacity (~150 clients). | | `--num-rounds` | `1` | | | `--messages-pattern` | `5:512` | `count:size,count:size,…` | | `--receive-timeout-ms` | `5000` | | @@ -67,7 +68,9 @@ as `--parachain-spec`. `--rpc-endpoints` as the wiring point. - **One smoldot per client.** Smoldot dedupes within a client, so one peer per client requires one `start()` per client. Each is ~tens of MB resident; for - N=100 expect multi-GB RAM. Use the same K8s-shard pattern as the Rust bench. + N=100 expect multi-GB RAM. For larger N, use `--workers ` to fork K child + processes (each gets its own event loop and runs N/K clients), or follow the + same K8s-shard pattern as the Rust bench. - **No barrier.** Smoldot startup is variable (5–30s) and gossip readiness is not observable from JSON-RPC, so a barrier on a code line gives false confidence. We use a fixed `--warmup-ms` instead and pace rounds by wall clock. diff --git a/examples/statement-latency-bench/bench.js b/examples/statement-latency-bench/bench.js index 0f8927dba9..18bb119c2e 100755 --- a/examples/statement-latency-bench/bench.js +++ b/examples/statement-latency-bench/bench.js @@ -1,6 +1,8 @@ #!/usr/bin/env node import { parseArgs } from "node:util"; import { randomBytes } from "node:crypto"; +import { fork } from "node:child_process"; +import { fileURLToPath } from "node:url"; import { start } from "smoldot"; import { loadChainSpec } from "./chainspec.js"; import { getKeypair } from "./keypair.js"; @@ -11,11 +13,11 @@ import { FailureKind, fail, reportResults } from "./stats.js"; const LEVEL = { ERROR: 1, WARN: 2, INFO: 3, DEBUG: 4, TRACE: 5 }; const LEVEL_LABEL = { 1: "ERROR", 2: "WARN", 3: "INFO", 4: "DEBUG", 5: "TRACE" }; -function makeLogger(maxLevel) { +function makeLogger(maxLevel, prefix = "") { const emit = (lvl, msg) => { if (lvl > maxLevel) return; const stream = lvl <= LEVEL.WARN ? process.stderr : process.stdout; - stream.write(`[${LEVEL_LABEL[lvl]}] ${msg}\n`); + stream.write(`${prefix}[${LEVEL_LABEL[lvl]}] ${msg}\n`); }; return { error: (msg) => emit(LEVEL.ERROR, msg), @@ -56,6 +58,7 @@ function parseFlags(argv) { "interval-ms": { type: "string", default: "10000" }, "statement-expiry-ms": { type: "string", default: "600000" }, "warmup-ms": { type: "string", default: "15000" }, + workers: { type: "string", default: "1" }, "fail-fast": { type: "boolean", default: false }, "log-level": { type: "string", default: "info" }, }, @@ -71,8 +74,11 @@ function parseFlags(argv) { const numClients = Number.parseInt(values["num-clients"], 10); const numRounds = Number.parseInt(values["num-rounds"], 10); + const workers = Number.parseInt(values["workers"], 10); if (!(numClients > 0)) throw new Error(`--num-clients must be > 0`); if (!(numRounds > 0)) throw new Error(`--num-rounds must be > 0`); + if (!(workers > 0)) throw new Error(`--workers must be > 0`); + if (workers > numClients) throw new Error(`--workers (${workers}) cannot exceed --num-clients (${numClients})`); return { parachainSpecSource: values["parachain-spec"], @@ -80,6 +86,7 @@ function parseFlags(argv) { falsePositiveRate: Number.parseFloat(values["false-positive-rate"]), numClients, numRounds, + workers, messagesPattern: parseMessagesPattern(values["messages-pattern"]), receiveTimeoutMs: Number.parseInt(values["receive-timeout-ms"], 10), intervalMs: Number.parseInt(values["interval-ms"], 10), @@ -97,14 +104,37 @@ function logConfiguration(log, args) { `parachain_spec=${args.parachainSpecSource} ` + `relay_chain_spec=${args.relayChainSpecSource} ` + `clients=${args.numClients} rounds=${args.numRounds} ` + + `workers=${args.workers} ` + `interval=${args.intervalMs}ms pattern=[${pattern}]`, ); } async function spawnClient({ clientId, args, parachainSpec, relaySpec, log }) { + const debugClientId = + process.env.BENCH_DEBUG_CLIENT !== undefined + ? Number.parseInt(process.env.BENCH_DEBUG_CLIENT, 10) + : null; + const isDebugClient = debugClientId !== null && debugClientId === clientId; + const smoldot = start({ - maxLogLevel: 3, - logCallback: (lvl, target, msg) => log.forSmoldot(lvl, target, msg), + maxLogLevel: isDebugClient ? 4 : 2, + logCallback: (lvl, target, msg) => { + if (isDebugClient) { + log.forSmoldot(lvl, `c${clientId}/${target}`, msg); + return; + } + if ( + target.startsWith("json-rpc-") && + msg.includes("statement_subscribeStatement") + ) + return; + if ( + target.startsWith("sync-service-") && + msg.startsWith("Error while verifying justification") + ) + return; + log.forSmoldot(lvl, target, msg); + }, }); const relayChain = await smoldot.addChain({ chainSpec: relaySpec, disableJsonRpc: true }); @@ -119,11 +149,33 @@ async function spawnClient({ clientId, args, parachainSpec, relaySpec, log }) { onUnexpected: (e) => log.warn(`client ${clientId} rpc: ${e.message}`), }); + // Optional periodic peer-health probe per client. Enable with + // BENCH_HEALTH_INTERVAL_MS=. Useful to compare succeeding vs + // failing clients' connectivity over the run. + const healthIntervalMs = Number.parseInt( + process.env.BENCH_HEALTH_INTERVAL_MS ?? "0", + 10, + ); + let healthTimer = null; + if (healthIntervalMs > 0) { + healthTimer = setInterval(async () => { + try { + const h = await rpc.request("system_health", []); + log.info( + `client ${clientId} health: peers=${h.peers} syncing=${h.isSyncing} shouldHavePeers=${h.shouldHavePeers}`, + ); + } catch (e) { + log.warn(`client ${clientId} health probe failed: ${e.message}`); + } + }, healthIntervalMs); + } + return { smoldot, chains, rpc, cleanup: async () => { + if (healthTimer) clearInterval(healthTimer); rpc.stop(); try { for (const c of chains) c.remove(); @@ -135,30 +187,19 @@ async function spawnClient({ clientId, args, parachainSpec, relaySpec, log }) { }; } -async function main() { - let args; - try { - args = parseFlags(process.argv.slice(2)); - } catch (e) { - process.stderr.write(`Error: ${e.message}\n`); - process.exit(2); - } - - const log = makeLogger(args.logLevel); - const testRunId = randomBytes(8).readBigUInt64LE(0).toString(); - - logConfiguration(log, args); - +// Run a contiguous range [clientStart, clientEnd) of clients in this process. +// Used both by single-process mode (range = [0, numClients)) and by each +// child worker. +async function runWorker({ args, clientStart, clientEnd, testRunId, log }) { const [parachainSpec, relaySpec] = await Promise.all([ loadChainSpec(args.parachainSpecSource), loadChainSpec(args.relayChainSpecSource), ]); - log.info(`Spawning ${args.numClients} client tasks... ${testRunId}`); - const abortController = new AbortController(); const handles = []; - for (let clientId = 0; clientId < args.numClients; clientId++) { + + for (let clientId = clientStart; clientId < clientEnd; clientId++) { handles.push((async () => { let resources; try { @@ -219,18 +260,134 @@ async function main() { } const results = await Promise.all(handles); - const allSuccesses = []; - const allFailures = []; + const successes = []; + const failures = []; for (const r of results) { - allSuccesses.push(...r.successes); - allFailures.push(...r.failures); + successes.push(...r.successes); + failures.push(...r.failures); + } + return { successes, failures }; +} + +// Compute [start, end) for worker `i` of `total`, splitting `n` clients +// evenly with the remainder distributed across the first `n % total` workers. +function workerRange(i, total, n) { + const base = Math.floor(n / total); + const rem = n % total; + const start = i * base + Math.min(i, rem); + const size = base + (i < rem ? 1 : 0); + return [start, start + size]; +} + +async function runAsParent(args, log) { + const testRunId = randomBytes(8).readBigUInt64LE(0).toString(); + logConfiguration(log, args); + log.info(`Spawning ${args.numClients} client tasks... ${testRunId}`); + + if (args.workers === 1) { + const { successes, failures } = await runWorker({ + args, + clientStart: 0, + clientEnd: args.numClients, + testRunId, + log, + }); + reportResults(log, successes, failures, args.numClients, args.numRounds); + process.exit(failures.length > 0 && successes.length === 0 ? 1 : 0); + } + + // Multi-worker: fork one child process per worker. Each child runs its own + // event loop and its own slice of clients in-process. + const selfPath = fileURLToPath(import.meta.url); + const childResults = []; + + const children = []; + for (let i = 0; i < args.workers; i++) { + const [start, end] = workerRange(i, args.workers, args.numClients); + log.info(`Forking worker ${i + 1}/${args.workers}: clients [${start}, ${end})`); + const child = fork(selfPath, process.argv.slice(2), { + env: { + ...process.env, + BENCH_WORKER_ID: String(i), + BENCH_WORKER_COUNT: String(args.workers), + BENCH_TEST_RUN_ID: testRunId, + BENCH_CLIENT_START: String(start), + BENCH_CLIENT_END: String(end), + }, + stdio: ["inherit", "inherit", "inherit", "ipc"], + }); + + children.push( + new Promise((resolve, reject) => { + let result = null; + child.on("message", (msg) => { + if (msg && msg.type === "result") result = msg; + }); + child.on("exit", (code) => { + if (result) { + childResults.push(result); + resolve(); + } else { + reject(new Error(`worker ${i} exited with code ${code} without sending a result`)); + } + }); + child.on("error", reject); + }), + ); } - reportResults(log, allSuccesses, allFailures, args.numClients, args.numRounds); + await Promise.all(children); - if (allFailures.length > 0 && allSuccesses.length === 0) { - process.exit(1); + const successes = []; + const failures = []; + for (const r of childResults) { + successes.push(...r.successes); + failures.push(...r.failures); } + reportResults(log, successes, failures, args.numClients, args.numRounds); + process.exit(failures.length > 0 && successes.length === 0 ? 1 : 0); +} + +async function runAsChild(args) { + const workerId = Number.parseInt(process.env.BENCH_WORKER_ID, 10); + const clientStart = Number.parseInt(process.env.BENCH_CLIENT_START, 10); + const clientEnd = Number.parseInt(process.env.BENCH_CLIENT_END, 10); + const testRunId = process.env.BENCH_TEST_RUN_ID; + + const log = makeLogger(args.logLevel, `[w${workerId}] `); + + const { successes, failures } = await runWorker({ + args, + clientStart, + clientEnd, + testRunId, + log, + }); + + if (typeof process.send === "function") { + await new Promise((resolve) => + process.send({ type: "result", successes, failures }, undefined, undefined, resolve), + ); + } + process.exit(0); +} + +async function main() { + let args; + try { + args = parseFlags(process.argv.slice(2)); + } catch (e) { + process.stderr.write(`Error: ${e.message}\n`); + process.exit(2); + } + + if (process.env.BENCH_WORKER_ID !== undefined) { + await runAsChild(args); + return; + } + + const log = makeLogger(args.logLevel); + await runAsParent(args, log); } main().catch((e) => { diff --git a/examples/statement-latency-bench/run-local.sh b/examples/statement-latency-bench/run-local.sh new file mode 100755 index 0000000000..5a1477bfab --- /dev/null +++ b/examples/statement-latency-bench/run-local.sh @@ -0,0 +1,167 @@ +#!/usr/bin/env bash +# Local helper: scrape chain specs + bootnodes out of a running zombienet +# (or polkadot-omni-node) network, then run the bench against it. +# Discovers ALL collators (and ALL relay validators) and adds each as a +# bootnode in the chain specs handed to smoldot. +set -euo pipefail + +cd "$(dirname "$0")" + +# Query a node's libp2p PeerId via the legacy `system_localPeerId` JSON-RPC. +peer_id_of() { + local rpc_port="$1" + curl -sS -m 5 -H 'Content-Type: application/json' \ + -d '{"jsonrpc":"2.0","method":"system_localPeerId","params":[],"id":1}' \ + "http://127.0.0.1:${rpc_port}" \ + | jq -r '.result // empty' +} + +# Extract the tcp port from a `--listen-addr` value like +# `/ip4/0.0.0.0/tcp/54115/ws`. +tcp_port_of_listen_addr() { + echo "$1" | sed -n 's,.*tcp/\([0-9][0-9]*\).*,\1,p' +} + +# --- discover running collators ---------------------------------------------- +echo "Looking for running polkadot-parachain / polkadot-omni-node processes ..." +COLLATORS=() +while IFS= read -r line; do + [ -n "$line" ] && COLLATORS+=("$line") +done < <(ps -axww -o command= \ + | grep -E 'polkadot-(parachain|omni-node)' \ + | grep -v grep || true) + +if [ "${#COLLATORS[@]}" -eq 0 ]; then + echo "Error: no polkadot-parachain/omni-node process found." >&2 + echo " Is your zombienet network up?" >&2 + exit 1 +fi + +PARACHAIN_SPEC="" +RELAY_SPEC="" +parachain_bootnodes=() + +for cmd in "${COLLATORS[@]}"; do + parachain_part=$(echo "$cmd" | sed 's/ -- .*//') + relay_part=$(echo "$cmd" | sed 's/.* -- //') + + [ -z "$PARACHAIN_SPEC" ] && \ + PARACHAIN_SPEC=$(echo "$parachain_part" | sed -n 's/.*--chain \([^ ]*\).*/\1/p; q') + [ -z "$RELAY_SPEC" ] && \ + RELAY_SPEC=$(echo "$relay_part" | sed -n 's/.*--chain \([^ ]*\).*/\1/p; q') + + rpc_port=$(echo "$parachain_part" | sed -n 's/.*--rpc-port \([0-9][0-9]*\).*/\1/p') + listen_addr=$(echo "$parachain_part" | sed -n 's/.*--listen-addr \([^ ]*\).*/\1/p') + p2p_port=$(tcp_port_of_listen_addr "$listen_addr") + + if [ -z "$rpc_port" ] || [ -z "$p2p_port" ]; then + echo " skipping collator (no rpc-port or listen-addr): $parachain_part" >&2 + continue + fi + + peer_id=$(peer_id_of "$rpc_port" || true) + if [ -z "$peer_id" ]; then + echo " skipping collator on rpc :$rpc_port (failed to fetch peer id)" >&2 + continue + fi + + parachain_bootnodes+=("/ip4/127.0.0.1/tcp/${p2p_port}/ws/p2p/${peer_id}") +done + +if [ -z "$PARACHAIN_SPEC" ] || [ -z "$RELAY_SPEC" ]; then + echo "Error: failed to extract --chain paths from collator commands." >&2 + exit 1 +fi + +# --- discover running relay validators --------------------------------------- +VALIDATORS=() +while IFS= read -r line; do + [ -n "$line" ] && VALIDATORS+=("$line") +done < <(ps -axww -o command= \ + | awk '/(^|\/)polkadot / && !/polkadot-(parachain|omni-node|prepare-worker|execute-worker)/' \ + | grep -v grep || true) + +relay_bootnodes=() +for cmd in "${VALIDATORS[@]}"; do + rpc_port=$(echo "$cmd" | sed -n 's/.*--rpc-port \([0-9][0-9]*\).*/\1/p') + listen_addr=$(echo "$cmd" | sed -n 's/.*--listen-addr \([^ ]*\).*/\1/p') + p2p_port=$(tcp_port_of_listen_addr "$listen_addr") + + if [ -z "$rpc_port" ] || [ -z "$p2p_port" ]; then + continue + fi + + peer_id=$(peer_id_of "$rpc_port" || true) + if [ -z "$peer_id" ]; then + continue + fi + + relay_bootnodes+=("/ip4/127.0.0.1/tcp/${p2p_port}/ws/p2p/${peer_id}") +done + +echo "Parachain spec : $PARACHAIN_SPEC" +echo "Relay spec : $RELAY_SPEC" +echo "Parachain bootnodes : ${#parachain_bootnodes[@]}" +if [ "${#parachain_bootnodes[@]}" -gt 0 ]; then + for n in "${parachain_bootnodes[@]}"; do echo " - $n"; done +fi +echo "Relay bootnodes : ${#relay_bootnodes[@]}" +if [ "${#relay_bootnodes[@]}" -gt 0 ]; then + for n in "${relay_bootnodes[@]}"; do echo " - $n"; done +fi + +# --- copy + patch specs ------------------------------------------------------ +mkdir -p chain-specs + +if [ "${#parachain_bootnodes[@]}" -gt 0 ]; then + para_bn_json=$(printf '%s\n' "${parachain_bootnodes[@]}" | jq -R . | jq -s .) +else + para_bn_json='[]' +fi +if [ "${#relay_bootnodes[@]}" -gt 0 ]; then + relay_bn_json=$(printf '%s\n' "${relay_bootnodes[@]}" | jq -R . | jq -s .) +else + relay_bn_json='[]' +fi + +# Parachain: id renamed to "parachain" so it's stable across runs (matches +# examples/statement-chat/dev.sh) and discovered bootnodes appended. +jq --argjson bn "$para_bn_json" \ + '.id = "parachain" | .bootNodes = ((.bootNodes // []) + $bn)' \ + "$PARACHAIN_SPEC" > chain-specs/parachain.json +echo "Wrote chain-specs/parachain.json" + +jq --argjson bn "$relay_bn_json" \ + '.bootNodes = ((.bootNodes // []) + $bn)' \ + "$RELAY_SPEC" > chain-specs/relay.json +echo "Wrote chain-specs/relay.json" + +# --- build smoldot if needed -------------------------------------------------- +if [ ! -d ../../wasm-node/javascript/dist ]; then + echo "" + echo "Building smoldot ..." + (cd ../../wasm-node/javascript && npm install && npm run build) +fi + +# --- npm install bench deps --------------------------------------------------- +if [ ! -d node_modules ]; then + echo "" + echo "Installing bench deps ..." + npm install +fi + +# --- run ---------------------------------------------------------------------- +echo "" +echo "Running bench ..." +exec node bench.js \ + --parachain-spec ./chain-specs/parachain.json \ + --relay-chain-spec ./chain-specs/relay.json \ + --num-clients "${NUM_CLIENTS:-4}" \ + --num-rounds "${NUM_ROUNDS:-2}" \ + --messages-pattern "${MESSAGES_PATTERN:-3:128}" \ + --receive-timeout-ms "${RECEIVE_TIMEOUT_MS:-30000}" \ + --interval-ms "${INTERVAL_MS:-5000}" \ + --warmup-ms "${WARMUP_MS:-15000}" \ + --workers "${WORKERS:-1}" \ + --log-level "${LOG_LEVEL:-info}" \ + "$@" From d219299494446023bda978f3f4d4f5b9e4c0728d Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Mon, 4 May 2026 16:47:48 +0200 Subject: [PATCH 05/12] Kill sibling workers on failure, drop jitter and decodeStatement, tidy comments --- examples/statement-latency-bench/README.md | 10 +- examples/statement-latency-bench/bench.js | 20 +++- examples/statement-latency-bench/client.js | 9 +- examples/statement-latency-bench/statement.js | 97 ++----------------- examples/statement-latency-bench/stats.js | 4 +- 5 files changed, 34 insertions(+), 106 deletions(-) diff --git a/examples/statement-latency-bench/README.md b/examples/statement-latency-bench/README.md index aaede5eb2e..b7f408cb08 100644 --- a/examples/statement-latency-bench/README.md +++ b/examples/statement-latency-bench/README.md @@ -76,7 +76,9 @@ as `--parachain-spec`. confidence. We use a fixed `--warmup-ms` instead and pace rounds by wall clock. - **`--seed` removed.** Always derives sr25519 keys via `//StatementClient//${idx}`, matching `sc_statement_store::test_utils::get_keypair`. -- **Output lines** otherwise match `bench.rs` verbatim so log parsers keep - working: `Starting Statement Store Latency Benchmark: …`, `Spawning {N} - client tasks... {testRunId}`, `Benchmark Results: send_min=…`, `Benchmark - Failed: failed_clients=…`, `Benchmark Finished: rounds_with_any_success=…`. +- **Output line prefixes** match `bench.rs` so log parsers keyed on the leading + tokens keep working: `Spawning {N} client tasks... {testRunId}`, `Benchmark + Results: send_min=…`, `Benchmark Failed: failed_clients=…`, `Benchmark + Finished: rounds_with_any_success=…`. The smoldot port also adds a + `Starting Statement Store Latency Benchmark:` config line and an `errors=[…]` + segment to `Benchmark Failed`, neither of which exists in `bench.rs`. diff --git a/examples/statement-latency-bench/bench.js b/examples/statement-latency-bench/bench.js index 18bb119c2e..132f022681 100755 --- a/examples/statement-latency-bench/bench.js +++ b/examples/statement-latency-bench/bench.js @@ -301,7 +301,8 @@ async function runAsParent(args, log) { const selfPath = fileURLToPath(import.meta.url); const childResults = []; - const children = []; + const childProcs = []; + const childPromises = []; for (let i = 0; i < args.workers; i++) { const [start, end] = workerRange(i, args.workers, args.numClients); log.info(`Forking worker ${i + 1}/${args.workers}: clients [${start}, ${end})`); @@ -316,8 +317,9 @@ async function runAsParent(args, log) { }, stdio: ["inherit", "inherit", "inherit", "ipc"], }); + childProcs.push(child); - children.push( + childPromises.push( new Promise((resolve, reject) => { let result = null; child.on("message", (msg) => { @@ -336,7 +338,19 @@ async function runAsParent(args, log) { ); } - await Promise.all(children); + // If any worker fails, kill the rest so they don't outlive the parent + // holding smoldot peers / sockets. + try { + await Promise.all(childPromises); + } catch (e) { + for (const c of childProcs) { + if (c.exitCode === null && c.signalCode === null) { + try { c.kill("SIGTERM"); } catch {} + } + } + await Promise.allSettled(childPromises); + throw e; + } const successes = []; const failures = []; diff --git a/examples/statement-latency-bench/client.js b/examples/statement-latency-bench/client.js index 735ccbd4c7..2295b28be5 100644 --- a/examples/statement-latency-bench/client.js +++ b/examples/statement-latency-bench/client.js @@ -3,7 +3,8 @@ import { u8aToHex } from "@polkadot/util"; import { encodeStatement, expiryFromParts } from "./statement.js"; import { FailureKind, fail } from "./stats.js"; -const MAX_TOPICS = 128; // bench.rs:249 — BoundedVec> on the subscribe side +// BoundedVec> on the statement_subscribeStatement filter side. +const MAX_TOPICS = 128; const enc = new TextEncoder(); @@ -104,7 +105,7 @@ async function executeRound({ round, config, rpc, pair, log }) { const expiryTs = Math.floor((Date.now() + statementExpiryMs) / 1000); const sequence = (sentCount + 1) * round; const expiry = expiryFromParts(expiryTs, sequence); - const data = new Uint8Array(size); // zero-filled; bench.rs:283 does the same + const data = new Uint8Array(size); // zero-filled, matching the Rust bench const hex = encodeStatement({ pair, expiry, channel, topic, data }); @@ -209,10 +210,6 @@ export async function runClient({ config, rpc, pair, abortSignal, log }) { const successes = []; const failures = []; - // Apply jitter to distribute submission load (bench.rs:423-424). - const submissionJitter = (config.clientId * 7) % 1000; - await sleep(submissionJitter, abortSignal).catch(() => {}); - for (let round = 1; round <= config.numRounds; round++) { if (abortSignal?.aborted) { failures.push(FailureKind.PeerFailed); diff --git a/examples/statement-latency-bench/statement.js b/examples/statement-latency-bench/statement.js index 74fe888775..6c0a46a512 100644 --- a/examples/statement-latency-bench/statement.js +++ b/examples/statement-latency-bench/statement.js @@ -1,18 +1,13 @@ -import { compactToU8a, compactFromU8a, u8aConcat, u8aToHex, hexToU8a } from "@polkadot/util"; +import { compactToU8a, u8aConcat, u8aToHex } from "@polkadot/util"; -// Field discriminants (lib/src/network/codec/statement.rs:35-46). +// Field discriminants (lib/src/network/codec/statement.rs). const FIELD_PROOF = 0; -const FIELD_DECRYPTION_KEY = 1; const FIELD_EXPIRY = 2; const FIELD_CHANNEL = 3; const FIELD_TOPIC_START = 4; -const FIELD_TOPIC_END = 7; const FIELD_DATA = 8; const PROOF_SR25519 = 0; -const PROOF_ED25519 = 1; -const PROOF_SECP256K1_ECDSA = 2; -const PROOF_ON_CHAIN = 3; function u64ToLeBytes(n) { const out = new Uint8Array(8); @@ -21,10 +16,6 @@ function u64ToLeBytes(n) { return out; } -function leBytesToU64(bytes) { - return new DataView(bytes.buffer, bytes.byteOffset, 8).getBigUint64(0, true); -} - // Pack (timestamp_secs, sequence) into the u64 expiry per // sp_statement_store::Statement::set_expiry_from_parts: // expiry = (timestamp_secs as u64) << 32 | sequence as u64 @@ -33,10 +24,9 @@ export function expiryFromParts(timestampSecs, sequence) { } // Build the bytes signed over by the proof, mirroring -// sp_statement_store::Statement::encoded(for_signing=true) in polkadot-sdk -// (substrate/primitives/statement-store/src/lib.rs:736-780): no leading -// field-count compact, no proof field, everything else in ascending discriminant -// order. +// sp_statement_store::Statement::encoded(for_signing=true) in polkadot-sdk: +// no leading field-count compact, no proof field, everything else in ascending +// discriminant order. function buildSignatureMaterial({ expiry, channel, topic, data }) { const parts = []; @@ -60,7 +50,7 @@ function buildSignatureMaterial({ expiry, channel, topic, data }) { // Encode a full sr25519-signed statement on the wire. // -// Wire format (matches both lib/src/network/codec/statement.rs:269-325 and +// Wire format (matches lib/src/network/codec/statement.rs and // sp_statement_store::Statement::encoded(false)): SCALE-compact field count, // then ascending (discriminant, value) pairs. export function encodeStatement({ pair, expiry, channel, topic, data }) { @@ -99,78 +89,3 @@ export function encodeStatement({ pair, expiry, channel, topic, data }) { return u8aToHex(u8aConcat(...parts)); } -// Best-effort decode for inspection / sanity checks. Skips fields we don't need -// in detail; pulls out proof signer, topic(s), and data. -export function decodeStatement(hex) { - const bytes = hexToU8a(hex); - let offset = 0; - - const [fieldCountSize, numFieldsBn] = compactFromU8a(bytes.subarray(offset)); - offset += fieldCountSize; - const numFields = numFieldsBn.toNumber(); - - let proof = null; - const topics = []; - let data = null; - let channel = null; - let expiry = null; - - for (let i = 0; i < numFields; i++) { - const disc = bytes[offset++]; - switch (disc) { - case FIELD_PROOF: { - const proofType = bytes[offset++]; - if (proofType === PROOF_SR25519 || proofType === PROOF_ED25519) { - const signature = bytes.slice(offset, offset + 64); - offset += 64; - const signer = bytes.slice(offset, offset + 32); - offset += 32; - proof = { - type: proofType === PROOF_SR25519 ? "Sr25519" : "Ed25519", - signature, - signer, - }; - } else if (proofType === PROOF_SECP256K1_ECDSA) { - offset += 65 + 33; - proof = { type: "Secp256k1Ecdsa" }; - } else if (proofType === PROOF_ON_CHAIN) { - offset += 32 + 32 + 8; - proof = { type: "OnChain" }; - } else { - throw new Error(`Unknown proof type: ${proofType}`); - } - break; - } - case FIELD_DECRYPTION_KEY: - offset += 32; - break; - case FIELD_EXPIRY: - expiry = leBytesToU64(bytes.slice(offset, offset + 8)); - offset += 8; - break; - case FIELD_CHANNEL: - channel = bytes.slice(offset, offset + 32); - offset += 32; - break; - case FIELD_TOPIC_START: - case FIELD_TOPIC_START + 1: - case FIELD_TOPIC_START + 2: - case FIELD_TOPIC_END: - topics.push(bytes.slice(offset, offset + 32)); - offset += 32; - break; - case FIELD_DATA: { - const [lenSize, lenBn] = compactFromU8a(bytes.subarray(offset)); - const len = lenBn.toNumber(); - offset += lenSize; - data = bytes.slice(offset, offset + len); - offset += len; - break; - } - default: - throw new Error(`Unknown field discriminant: ${disc}`); - } - } - - return { proof, expiry, channel, topics, data }; -} diff --git a/examples/statement-latency-bench/stats.js b/examples/statement-latency-bench/stats.js index 18a24a0284..4bd7f0a698 100644 --- a/examples/statement-latency-bench/stats.js +++ b/examples/statement-latency-bench/stats.js @@ -1,4 +1,4 @@ -// Closed set of failure categories. Mirrors bench.rs:118-129. +// Closed set of failure categories, mirroring the Rust bench's FailureKind enum. export const FailureKind = { TooManyTopics: "Too many topics", SubscribeFailed: "Failed to open RPC subscription", @@ -11,7 +11,7 @@ export const FailureKind = { }; // Logs the failure with bench.rs's wording and returns the kind so callers can -// `return fail(...)`. Mirrors bench.rs:148-161. +// `return fail(...)`. export function fail(log, clientId, roundInfo, kind, detail) { const prefix = roundInfo ? `Round ${roundInfo[0]}/${roundInfo[1]}: ` : ""; log.warn(`Client ${clientId}: ${prefix}${kind} (${detail})`); From f7a4177986fc48a27214a2c86706a409d4c2ec6e Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Mon, 4 May 2026 16:52:03 +0200 Subject: [PATCH 06/12] Add PR workflow to build/push paritypr/smoldot debug image --- .github/workflows/docker-pr.yml | 36 +++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 .github/workflows/docker-pr.yml diff --git a/.github/workflows/docker-pr.yml b/.github/workflows/docker-pr.yml new file mode 100644 index 0000000000..ca6f638c9e --- /dev/null +++ b/.github/workflows/docker-pr.yml @@ -0,0 +1,36 @@ +name: docker-pr + +on: + pull_request: + types: [opened, synchronize, reopened, labeled] + +permissions: + contents: read + +jobs: + build-push: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v6 + with: + ref: ${{ github.event.pull_request.head.sha }} + + - id: meta + run: | + echo "tag=${{ github.event.pull_request.number }}-$(git rev-parse --short=8 HEAD)" >> "$GITHUB_OUTPUT" + + - uses: docker/setup-qemu-action@v3 + - uses: docker/setup-buildx-action@v3 + + - if: contains(github.event.pull_request.labels.*.name, 'push-debug-image') + uses: docker/login-action@v3 + with: + username: ${{ secrets.PARITYPR_DOCKERHUB_USERNAME }} + password: ${{ secrets.PARITYPR_DOCKERHUB_PASSWORD }} + + - uses: docker/build-push-action@v6.18.0 + with: + context: . + file: ./full-node/Dockerfile + push: ${{ contains(github.event.pull_request.labels.*.name, 'push-debug-image') }} + tags: paritypr/smoldot:${{ steps.meta.outputs.tag }} From ef15e51b5a65efe61d43c7bdb2015c9068d682b0 Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Tue, 5 May 2026 17:01:36 +0200 Subject: [PATCH 07/12] Add Dockerfile for statement-latency-bench and wire it into docker-pr workflow --- .github/workflows/docker-pr.yml | 2 +- examples/statement-latency-bench/Dockerfile | 28 +++++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) create mode 100644 examples/statement-latency-bench/Dockerfile diff --git a/.github/workflows/docker-pr.yml b/.github/workflows/docker-pr.yml index ca6f638c9e..1ac47aacc0 100644 --- a/.github/workflows/docker-pr.yml +++ b/.github/workflows/docker-pr.yml @@ -31,6 +31,6 @@ jobs: - uses: docker/build-push-action@v6.18.0 with: context: . - file: ./full-node/Dockerfile + file: ./examples/statement-latency-bench/Dockerfile push: ${{ contains(github.event.pull_request.labels.*.name, 'push-debug-image') }} tags: paritypr/smoldot:${{ steps.meta.outputs.tag }} diff --git a/examples/statement-latency-bench/Dockerfile b/examples/statement-latency-bench/Dockerfile new file mode 100644 index 0000000000..5d9b7aecea --- /dev/null +++ b/examples/statement-latency-bench/Dockerfile @@ -0,0 +1,28 @@ +# Build & run the statement-latency-bench Node.js tool. +# +# The bench depends on the local `smoldot` npm package +# (wasm-node/javascript), whose `prepack` step compiles Rust → wasm. +# So the builder stage needs rust + the wasm32 target + node. + +FROM rust:1 AS builder + +RUN apt-get update && apt-get install -y --no-install-recommends curl ca-certificates \ + && curl -fsSL https://deb.nodesource.com/setup_20.x | bash - \ + && apt-get install -y --no-install-recommends nodejs \ + && rm -rf /var/lib/apt/lists/* + +RUN rustup target add wasm32-unknown-unknown + +COPY ./.. /build +WORKDIR /build/examples/statement-latency-bench +RUN npm install --omit=dev + + +FROM alpine:latest + +RUN apk add --no-cache nodejs + +WORKDIR /app +COPY --from=builder /build/examples/statement-latency-bench /app + +ENTRYPOINT ["node", "/app/bench.js"] From e078f29b8531eb6c65333b910e3023d44c10e020 Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Thu, 7 May 2026 13:13:21 +0200 Subject: [PATCH 08/12] Synchronize round 1 start across all clients via warmup barrier --- examples/statement-latency-bench/bench.js | 60 ++++++++++++++++++++++- 1 file changed, 58 insertions(+), 2 deletions(-) diff --git a/examples/statement-latency-bench/bench.js b/examples/statement-latency-bench/bench.js index 132f022681..e97207166b 100755 --- a/examples/statement-latency-bench/bench.js +++ b/examples/statement-latency-bench/bench.js @@ -190,7 +190,12 @@ async function spawnClient({ clientId, args, parachainSpec, relaySpec, log }) { // Run a contiguous range [clientStart, clientEnd) of clients in this process. // Used both by single-process mode (range = [0, numClients)) and by each // child worker. -async function runWorker({ args, clientStart, clientEnd, testRunId, log }) { +// +// `barrier` synchronises the start of round 1 across all clients (and across +// workers when multi-worker). After warmup each client calls `barrier.arrive()` +// then awaits `barrier.waitStart()` before entering the round loop. Subsequent +// rounds are paced independently by `intervalMs` in `runClient`. +async function runWorker({ args, clientStart, clientEnd, testRunId, log, barrier }) { const [parachainSpec, relaySpec] = await Promise.all([ loadChainSpec(args.parachainSpecSource), loadChainSpec(args.relayChainSpecSource), @@ -233,6 +238,13 @@ async function runWorker({ args, clientStart, clientEnd, testRunId, log }) { // --warmup-ms per network. await new Promise((r) => setTimeout(r, args.warmupMs)); + // Synchronise round 1 across all clients globally. Without this each + // client enters round 1 as soon as its own warmup completes, drifts on + // round-1 receive timeouts, and senders end up emitting statements for + // round N+k while receivers are still subscribed for round N. + barrier.arrive(); + await barrier.waitStart(); + const result = await runClient({ config, rpc: resources.rpc, @@ -279,18 +291,41 @@ function workerRange(i, total, n) { return [start, start + size]; } +// Barrier that releases once all `expected` participants have arrived. In +// multi-worker mode, the parent uses one of these to gate the children, and +// each child uses one of these locally over its own clients before reporting +// "ready" up to the parent. +function makeBarrier(expected, onAllArrived) { + let arrived = 0; + let releaseStart; + const startPromise = new Promise((r) => { releaseStart = r; }); + return { + arrive: () => { + arrived += 1; + if (arrived === expected && onAllArrived) onAllArrived(); + }, + waitStart: () => startPromise, + release: () => releaseStart(), + }; +} + async function runAsParent(args, log) { const testRunId = randomBytes(8).readBigUInt64LE(0).toString(); logConfiguration(log, args); log.info(`Spawning ${args.numClients} client tasks... ${testRunId}`); if (args.workers === 1) { + const barrier = makeBarrier(args.numClients, () => { + log.info(`All ${args.numClients} clients ready; starting round 1`); + barrier.release(); + }); const { successes, failures } = await runWorker({ args, clientStart: 0, clientEnd: args.numClients, testRunId, log, + barrier, }); reportResults(log, successes, failures, args.numClients, args.numRounds); process.exit(failures.length > 0 && successes.length === 0 ? 1 : 0); @@ -303,6 +338,13 @@ async function runAsParent(args, log) { const childProcs = []; const childPromises = []; + // Round-1 barrier across workers: each child sends `worker-ready` once all + // its local clients have warmed up; once every child has reported, the + // parent broadcasts `start` and all clients begin round 1 simultaneously. + const workerBarrier = makeBarrier(args.workers, () => { + log.info(`All ${args.workers} workers ready; starting round 1`); + for (const c of childProcs) c.send({ type: "start" }); + }); for (let i = 0; i < args.workers; i++) { const [start, end] = workerRange(i, args.workers, args.numClients); log.info(`Forking worker ${i + 1}/${args.workers}: clients [${start}, ${end})`); @@ -323,7 +365,9 @@ async function runAsParent(args, log) { new Promise((resolve, reject) => { let result = null; child.on("message", (msg) => { - if (msg && msg.type === "result") result = msg; + if (!msg) return; + if (msg.type === "worker-ready") workerBarrier.arrive(); + else if (msg.type === "result") result = msg; }); child.on("exit", (code) => { if (result) { @@ -370,12 +414,24 @@ async function runAsChild(args) { const log = makeLogger(args.logLevel, `[w${workerId}] `); + // Local barrier across this worker's clients: when all have warmed up, tell + // the parent. Then wait for the parent's `start` broadcast (issued once + // every worker has reported ready) and release the local clients. + const localCount = clientEnd - clientStart; + const barrier = makeBarrier(localCount, () => { + process.send({ type: "worker-ready" }); + }); + process.on("message", (msg) => { + if (msg && msg.type === "start") barrier.release(); + }); + const { successes, failures } = await runWorker({ args, clientStart, clientEnd, testRunId, log, + barrier, }); if (typeof process.send === "function") { From 998bc07144ff19ec47f02a8398f607d67989520e Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Fri, 8 May 2026 10:59:08 +0200 Subject: [PATCH 09/12] Add bash to image --- examples/statement-latency-bench/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/statement-latency-bench/Dockerfile b/examples/statement-latency-bench/Dockerfile index 5d9b7aecea..a5aaaac90f 100644 --- a/examples/statement-latency-bench/Dockerfile +++ b/examples/statement-latency-bench/Dockerfile @@ -20,7 +20,7 @@ RUN npm install --omit=dev FROM alpine:latest -RUN apk add --no-cache nodejs +RUN apk add --no-cache nodejs bash WORKDIR /app COPY --from=builder /build/examples/statement-latency-bench /app From b664511b8367e160c375c740775bad81274708ed Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Fri, 8 May 2026 11:48:04 +0200 Subject: [PATCH 10/12] Install links --- examples/statement-latency-bench/Dockerfile | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/examples/statement-latency-bench/Dockerfile b/examples/statement-latency-bench/Dockerfile index a5aaaac90f..2562a9e373 100644 --- a/examples/statement-latency-bench/Dockerfile +++ b/examples/statement-latency-bench/Dockerfile @@ -15,7 +15,10 @@ RUN rustup target add wasm32-unknown-unknown COPY ./.. /build WORKDIR /build/examples/statement-latency-bench -RUN npm install --omit=dev +# --install-links materializes the `file:../../wasm-node/javascript` +# dep as a real copy in node_modules instead of a symlink, so the +# runtime stage can be a self-contained copy of just this directory. +RUN npm install --omit=dev --install-links FROM alpine:latest From 31203f1da832d5a84bcdd5b08ec0732d36adaaf5 Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Fri, 8 May 2026 12:39:56 +0200 Subject: [PATCH 11/12] Update dockerfile --- examples/statement-latency-bench/Dockerfile | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/examples/statement-latency-bench/Dockerfile b/examples/statement-latency-bench/Dockerfile index 2562a9e373..640f624f3c 100644 --- a/examples/statement-latency-bench/Dockerfile +++ b/examples/statement-latency-bench/Dockerfile @@ -14,10 +14,19 @@ RUN apt-get update && apt-get install -y --no-install-recommends curl ca-certifi RUN rustup target add wasm32-unknown-unknown COPY ./.. /build -WORKDIR /build/examples/statement-latency-bench + +# Build the smoldot npm package first: `npm run build` runs +# prepare.mjs (compiles Rust → wasm) then tsc, producing dist/. +# A plain `file:` install of the bench would NOT run these scripts. +WORKDIR /build/wasm-node/javascript +RUN npm install +RUN npm run build + # --install-links materializes the `file:../../wasm-node/javascript` -# dep as a real copy in node_modules instead of a symlink, so the -# runtime stage can be a self-contained copy of just this directory. +# dep as a real copy (including the freshly built dist/) in +# node_modules instead of a symlink, so the runtime stage can be a +# self-contained copy of just this directory. +WORKDIR /build/examples/statement-latency-bench RUN npm install --omit=dev --install-links From e8fb7732d5e73f2c9091c3fd29f1e906d326e79e Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Fri, 8 May 2026 16:48:51 +0200 Subject: [PATCH 12/12] Replace fixed warmup with system_health poll, default cap 120s --- examples/statement-latency-bench/bench.js | 28 +++++++++++++++++++---- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/examples/statement-latency-bench/bench.js b/examples/statement-latency-bench/bench.js index e97207166b..b2731737a1 100755 --- a/examples/statement-latency-bench/bench.js +++ b/examples/statement-latency-bench/bench.js @@ -57,7 +57,7 @@ function parseFlags(argv) { "receive-timeout-ms": { type: "string", default: "5000" }, "interval-ms": { type: "string", default: "10000" }, "statement-expiry-ms": { type: "string", default: "600000" }, - "warmup-ms": { type: "string", default: "15000" }, + "warmup-ms": { type: "string", default: "120000" }, workers: { type: "string", default: "1" }, "fail-fast": { type: "boolean", default: false }, "log-level": { type: "string", default: "info" }, @@ -233,10 +233,28 @@ async function runWorker({ args, clientStart, clientEnd, testRunId, log, barrier failFast: args.failFast, }; - // Best-effort warm-up: smoldot has no JSON-RPC signal for "I have a - // statement-store peer". A fixed sleep is a coarse heuristic; tune via - // --warmup-ms per network. - await new Promise((r) => setTimeout(r, args.warmupMs)); + // Warm-up: smoldot has no JSON-RPC signal for "I have a statement-store + // peer", but addChain returns long before the chain has finalized its + // first block, and statement_submitStatement just hangs silently on an + // unbootstrapped parachain. Poll system_health as a proxy and proceed + // once peers > 0 && !isSyncing, capped at --warmup-ms as a safety net. + const warmupDeadline = Date.now() + args.warmupMs; + let warmupReady = false; + while (Date.now() < warmupDeadline) { + try { + const h = await resources.rpc.request("system_health", []); + if (h && h.peers > 0 && h.isSyncing === false) { + warmupReady = true; + break; + } + } catch {} + await new Promise((r) => setTimeout(r, 1000)); + } + if (!warmupReady) { + log.warn( + `Client ${clientId}: warmup deadline (${args.warmupMs}ms) reached without system_health ready; proceeding anyway`, + ); + } // Synchronise round 1 across all clients globally. Without this each // client enters round 1 as soon as its own warmup completes, drifts on