diff --git a/.github/workflows/docker-pr.yml b/.github/workflows/docker-pr.yml new file mode 100644 index 0000000000..1ac47aacc0 --- /dev/null +++ b/.github/workflows/docker-pr.yml @@ -0,0 +1,36 @@ +name: docker-pr + +on: + pull_request: + types: [opened, synchronize, reopened, labeled] + +permissions: + contents: read + +jobs: + build-push: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v6 + with: + ref: ${{ github.event.pull_request.head.sha }} + + - id: meta + run: | + echo "tag=${{ github.event.pull_request.number }}-$(git rev-parse --short=8 HEAD)" >> "$GITHUB_OUTPUT" + + - uses: docker/setup-qemu-action@v3 + - uses: docker/setup-buildx-action@v3 + + - if: contains(github.event.pull_request.labels.*.name, 'push-debug-image') + uses: docker/login-action@v3 + with: + username: ${{ secrets.PARITYPR_DOCKERHUB_USERNAME }} + password: ${{ secrets.PARITYPR_DOCKERHUB_PASSWORD }} + + - uses: docker/build-push-action@v6.18.0 + with: + context: . + file: ./examples/statement-latency-bench/Dockerfile + push: ${{ contains(github.event.pull_request.labels.*.name, 'push-debug-image') }} + tags: paritypr/smoldot:${{ steps.meta.outputs.tag }} diff --git a/examples/statement-latency-bench/.gitignore b/examples/statement-latency-bench/.gitignore new file mode 100644 index 0000000000..2a18eb39e4 --- /dev/null +++ b/examples/statement-latency-bench/.gitignore @@ -0,0 +1,4 @@ +node_modules/ +package-lock.json +chain-specs/ +*.log diff --git a/examples/statement-latency-bench/Dockerfile b/examples/statement-latency-bench/Dockerfile new file mode 100644 index 0000000000..640f624f3c --- /dev/null +++ b/examples/statement-latency-bench/Dockerfile @@ -0,0 +1,40 @@ +# Build & run the statement-latency-bench Node.js tool. +# +# The bench depends on the local `smoldot` npm package +# (wasm-node/javascript), whose `prepack` step compiles Rust → wasm. +# So the builder stage needs rust + the wasm32 target + node. + +FROM rust:1 AS builder + +RUN apt-get update && apt-get install -y --no-install-recommends curl ca-certificates \ + && curl -fsSL https://deb.nodesource.com/setup_20.x | bash - \ + && apt-get install -y --no-install-recommends nodejs \ + && rm -rf /var/lib/apt/lists/* + +RUN rustup target add wasm32-unknown-unknown + +COPY ./.. /build + +# Build the smoldot npm package first: `npm run build` runs +# prepare.mjs (compiles Rust → wasm) then tsc, producing dist/. +# A plain `file:` install of the bench would NOT run these scripts. +WORKDIR /build/wasm-node/javascript +RUN npm install +RUN npm run build + +# --install-links materializes the `file:../../wasm-node/javascript` +# dep as a real copy (including the freshly built dist/) in +# node_modules instead of a symlink, so the runtime stage can be a +# self-contained copy of just this directory. +WORKDIR /build/examples/statement-latency-bench +RUN npm install --omit=dev --install-links + + +FROM alpine:latest + +RUN apk add --no-cache nodejs bash + +WORKDIR /app +COPY --from=builder /build/examples/statement-latency-bench /app + +ENTRYPOINT ["node", "/app/bench.js"] diff --git a/examples/statement-latency-bench/README.md b/examples/statement-latency-bench/README.md new file mode 100644 index 0000000000..b7f408cb08 --- /dev/null +++ b/examples/statement-latency-bench/README.md @@ -0,0 +1,84 @@ +# statement-latency-bench + +Smoldot-based Node.js port of `polkadot-sdk/substrate/client/statement-store/statement-latency-bench`. + +Each virtual client runs an in-process smoldot light client and joins the +statement-store libp2p network as its own peer. There is no `--rpc-endpoints`: +smoldot is a peer, not an RPC client of a node. + +## Build smoldot first + +```bash +cd ../../wasm-node/javascript +npm install && npm run build +cd - +npm install +``` + +## Usage + +`--parachain-spec` and `--relay-chain-spec` each accept either a local file +path or an `http(s)://` URL. Bootnodes are expected to be embedded in the +spec; the canonical source is the [paritytech/chainspecs](https://github.com/paritytech/chainspecs) +repo, and smoldot bundles popular ones under `../../demo-chain-specs/`. + +```bash +# bundled spec from the smoldot repo +node bench.js \ + --parachain-spec ../../demo-chain-specs/polkadot_asset_hub.json \ + --relay-chain-spec ../../demo-chain-specs/polkadot.json \ + --num-clients 4 --num-rounds 2 --messages-pattern "3:128" \ + --receive-timeout-ms 30000 --interval-ms 5000 + +# raw URL straight from polkadot-sdk (paritytech/chainspecs is a symlink-only +# index repo; raw.githubusercontent doesn't follow its symlinks across submodules, +# so point at the underlying files directly) +node bench.js \ + --parachain-spec https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/cumulus/parachains/chain-specs/asset-hub-polkadot.json \ + --relay-chain-spec https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/polkadot/node/service/chain-specs/polkadot.json \ + --num-clients 4 --num-rounds 2 --messages-pattern "3:128" +``` + +For a local dev network, splice your bootnode into the parachain spec once +(e.g. with `jq`, like `examples/statement-chat/dev.sh`) and pass the result +as `--parachain-spec`. + +## Flags + +| Flag | Default | Notes | +|---|---|---| +| `--parachain-spec` | required | Parachain raw chain spec — file path or `http(s)://` URL. | +| `--relay-chain-spec` | required | Relay chain raw chain spec — file path or `http(s)://` URL. | +| `--false-positive-rate` | `0.01` | Statement-store affinity bloom filter rate. | +| `--num-clients` | `100` | Each spawns its own smoldot instance. | +| `--workers` | `1` | If >1, fork that many child processes; clients distributed evenly. Use to scale past one event loop's capacity (~150 clients). | +| `--num-rounds` | `1` | | +| `--messages-pattern` | `5:512` | `count:size,count:size,…` | +| `--receive-timeout-ms` | `5000` | | +| `--interval-ms` | `10000` | Wall-clock pacing between rounds. | +| `--statement-expiry-ms` | `600000` | | +| `--warmup-ms` | `15000` | Fixed sleep after `addChain` before round 1. | +| `--fail-fast` | `false` | First failure aborts all clients via AbortController. | +| `--log-level` | `info` | error/warn/info/debug/trace | + +## Differences from `bench.rs` + +- **Connection model.** Rust connects to N WS endpoints; smoldot joins the + network directly. Chain specs (with embedded bootnodes) replace + `--rpc-endpoints` as the wiring point. +- **One smoldot per client.** Smoldot dedupes within a client, so one peer per + client requires one `start()` per client. Each is ~tens of MB resident; for + N=100 expect multi-GB RAM. For larger N, use `--workers ` to fork K child + processes (each gets its own event loop and runs N/K clients), or follow the + same K8s-shard pattern as the Rust bench. +- **No barrier.** Smoldot startup is variable (5–30s) and gossip readiness is + not observable from JSON-RPC, so a barrier on a code line gives false + confidence. We use a fixed `--warmup-ms` instead and pace rounds by wall clock. +- **`--seed` removed.** Always derives sr25519 keys via `//StatementClient//${idx}`, + matching `sc_statement_store::test_utils::get_keypair`. +- **Output line prefixes** match `bench.rs` so log parsers keyed on the leading + tokens keep working: `Spawning {N} client tasks... {testRunId}`, `Benchmark + Results: send_min=…`, `Benchmark Failed: failed_clients=…`, `Benchmark + Finished: rounds_with_any_success=…`. The smoldot port also adds a + `Starting Statement Store Latency Benchmark:` config line and an `errors=[…]` + segment to `Benchmark Failed`, neither of which exists in `bench.rs`. diff --git a/examples/statement-latency-bench/bench.js b/examples/statement-latency-bench/bench.js new file mode 100755 index 0000000000..b2731737a1 --- /dev/null +++ b/examples/statement-latency-bench/bench.js @@ -0,0 +1,484 @@ +#!/usr/bin/env node +import { parseArgs } from "node:util"; +import { randomBytes } from "node:crypto"; +import { fork } from "node:child_process"; +import { fileURLToPath } from "node:url"; +import { start } from "smoldot"; +import { loadChainSpec } from "./chainspec.js"; +import { getKeypair } from "./keypair.js"; +import { SmoldotRpc } from "./smoldot-rpc.js"; +import { runClient } from "./client.js"; +import { FailureKind, fail, reportResults } from "./stats.js"; + +const LEVEL = { ERROR: 1, WARN: 2, INFO: 3, DEBUG: 4, TRACE: 5 }; +const LEVEL_LABEL = { 1: "ERROR", 2: "WARN", 3: "INFO", 4: "DEBUG", 5: "TRACE" }; + +function makeLogger(maxLevel, prefix = "") { + const emit = (lvl, msg) => { + if (lvl > maxLevel) return; + const stream = lvl <= LEVEL.WARN ? process.stderr : process.stdout; + stream.write(`${prefix}[${LEVEL_LABEL[lvl]}] ${msg}\n`); + }; + return { + error: (msg) => emit(LEVEL.ERROR, msg), + warn: (msg) => emit(LEVEL.WARN, msg), + info: (msg) => emit(LEVEL.INFO, msg), + debug: (msg) => emit(LEVEL.DEBUG, msg), + trace: (msg) => emit(LEVEL.TRACE, msg), + forSmoldot: (lvl, target, message) => emit(lvl, `[${target}] ${message}`), + }; +} + +function parseMessagesPattern(pattern) { + return pattern.split(",").map((part) => { + const [c, s] = part.trim().split(":"); + if (c === undefined || s === undefined) { + throw new Error(`Invalid pattern '${part}'. Expected 'count:size'`); + } + const count = Number.parseInt(c, 10); + const size = Number.parseInt(s, 10); + if (!Number.isFinite(count) || !Number.isFinite(size)) { + throw new Error(`Invalid count/size in pattern '${part}'`); + } + return [count, size]; + }); +} + +function parseFlags(argv) { + const { values } = parseArgs({ + args: argv, + options: { + "parachain-spec": { type: "string" }, + "relay-chain-spec": { type: "string" }, + "false-positive-rate": { type: "string", default: "0.01" }, + "num-clients": { type: "string", default: "100" }, + "num-rounds": { type: "string", default: "1" }, + "messages-pattern": { type: "string", default: "5:512" }, + "receive-timeout-ms": { type: "string", default: "5000" }, + "interval-ms": { type: "string", default: "10000" }, + "statement-expiry-ms": { type: "string", default: "600000" }, + "warmup-ms": { type: "string", default: "120000" }, + workers: { type: "string", default: "1" }, + "fail-fast": { type: "boolean", default: false }, + "log-level": { type: "string", default: "info" }, + }, + strict: true, + }); + + if (!values["parachain-spec"]) { + throw new Error("--parachain-spec is required"); + } + if (!values["relay-chain-spec"]) { + throw new Error("--relay-chain-spec is required"); + } + + const numClients = Number.parseInt(values["num-clients"], 10); + const numRounds = Number.parseInt(values["num-rounds"], 10); + const workers = Number.parseInt(values["workers"], 10); + if (!(numClients > 0)) throw new Error(`--num-clients must be > 0`); + if (!(numRounds > 0)) throw new Error(`--num-rounds must be > 0`); + if (!(workers > 0)) throw new Error(`--workers must be > 0`); + if (workers > numClients) throw new Error(`--workers (${workers}) cannot exceed --num-clients (${numClients})`); + + return { + parachainSpecSource: values["parachain-spec"], + relayChainSpecSource: values["relay-chain-spec"], + falsePositiveRate: Number.parseFloat(values["false-positive-rate"]), + numClients, + numRounds, + workers, + messagesPattern: parseMessagesPattern(values["messages-pattern"]), + receiveTimeoutMs: Number.parseInt(values["receive-timeout-ms"], 10), + intervalMs: Number.parseInt(values["interval-ms"], 10), + statementExpiryMs: Number.parseInt(values["statement-expiry-ms"], 10), + warmupMs: Number.parseInt(values["warmup-ms"], 10), + failFast: values["fail-fast"], + logLevel: LEVEL[values["log-level"].toUpperCase()] ?? LEVEL.INFO, + }; +} + +function logConfiguration(log, args) { + const pattern = args.messagesPattern.map(([c, s]) => `${c}x${s}B`).join(", "); + log.info( + `Starting Statement Store Latency Benchmark: ` + + `parachain_spec=${args.parachainSpecSource} ` + + `relay_chain_spec=${args.relayChainSpecSource} ` + + `clients=${args.numClients} rounds=${args.numRounds} ` + + `workers=${args.workers} ` + + `interval=${args.intervalMs}ms pattern=[${pattern}]`, + ); +} + +async function spawnClient({ clientId, args, parachainSpec, relaySpec, log }) { + const debugClientId = + process.env.BENCH_DEBUG_CLIENT !== undefined + ? Number.parseInt(process.env.BENCH_DEBUG_CLIENT, 10) + : null; + const isDebugClient = debugClientId !== null && debugClientId === clientId; + + const smoldot = start({ + maxLogLevel: isDebugClient ? 4 : 2, + logCallback: (lvl, target, msg) => { + if (isDebugClient) { + log.forSmoldot(lvl, `c${clientId}/${target}`, msg); + return; + } + if ( + target.startsWith("json-rpc-") && + msg.includes("statement_subscribeStatement") + ) + return; + if ( + target.startsWith("sync-service-") && + msg.startsWith("Error while verifying justification") + ) + return; + log.forSmoldot(lvl, target, msg); + }, + }); + + const relayChain = await smoldot.addChain({ chainSpec: relaySpec, disableJsonRpc: true }); + const parachain = await smoldot.addChain({ + chainSpec: parachainSpec, + potentialRelayChains: [relayChain], + statementStore: { falsePositiveRate: args.falsePositiveRate }, + }); + const chains = [relayChain, parachain]; + + const rpc = new SmoldotRpc(parachain, { + onUnexpected: (e) => log.warn(`client ${clientId} rpc: ${e.message}`), + }); + + // Optional periodic peer-health probe per client. Enable with + // BENCH_HEALTH_INTERVAL_MS=. Useful to compare succeeding vs + // failing clients' connectivity over the run. + const healthIntervalMs = Number.parseInt( + process.env.BENCH_HEALTH_INTERVAL_MS ?? "0", + 10, + ); + let healthTimer = null; + if (healthIntervalMs > 0) { + healthTimer = setInterval(async () => { + try { + const h = await rpc.request("system_health", []); + log.info( + `client ${clientId} health: peers=${h.peers} syncing=${h.isSyncing} shouldHavePeers=${h.shouldHavePeers}`, + ); + } catch (e) { + log.warn(`client ${clientId} health probe failed: ${e.message}`); + } + }, healthIntervalMs); + } + + return { + smoldot, + chains, + rpc, + cleanup: async () => { + if (healthTimer) clearInterval(healthTimer); + rpc.stop(); + try { + for (const c of chains) c.remove(); + } catch {} + try { + await smoldot.terminate(); + } catch {} + }, + }; +} + +// Run a contiguous range [clientStart, clientEnd) of clients in this process. +// Used both by single-process mode (range = [0, numClients)) and by each +// child worker. +// +// `barrier` synchronises the start of round 1 across all clients (and across +// workers when multi-worker). After warmup each client calls `barrier.arrive()` +// then awaits `barrier.waitStart()` before entering the round loop. Subsequent +// rounds are paced independently by `intervalMs` in `runClient`. +async function runWorker({ args, clientStart, clientEnd, testRunId, log, barrier }) { + const [parachainSpec, relaySpec] = await Promise.all([ + loadChainSpec(args.parachainSpecSource), + loadChainSpec(args.relayChainSpecSource), + ]); + + const abortController = new AbortController(); + const handles = []; + + for (let clientId = clientStart; clientId < clientEnd; clientId++) { + handles.push((async () => { + let resources; + try { + resources = await spawnClient({ clientId, args, parachainSpec, relaySpec, log }); + } catch (e) { + return { + successes: [], + failures: [ + fail(log, clientId, null, FailureKind.TaskPanicked, `failed to start smoldot: ${e.message}`), + ], + }; + } + + try { + const pair = await getKeypair(clientId); + const config = { + clientId, + neighbourId: (clientId + 1) % args.numClients, + numClients: args.numClients, + numRounds: args.numRounds, + testRunId, + messagesPattern: args.messagesPattern, + receiveTimeoutMs: args.receiveTimeoutMs, + intervalMs: args.intervalMs, + statementExpiryMs: args.statementExpiryMs, + failFast: args.failFast, + }; + + // Warm-up: smoldot has no JSON-RPC signal for "I have a statement-store + // peer", but addChain returns long before the chain has finalized its + // first block, and statement_submitStatement just hangs silently on an + // unbootstrapped parachain. Poll system_health as a proxy and proceed + // once peers > 0 && !isSyncing, capped at --warmup-ms as a safety net. + const warmupDeadline = Date.now() + args.warmupMs; + let warmupReady = false; + while (Date.now() < warmupDeadline) { + try { + const h = await resources.rpc.request("system_health", []); + if (h && h.peers > 0 && h.isSyncing === false) { + warmupReady = true; + break; + } + } catch {} + await new Promise((r) => setTimeout(r, 1000)); + } + if (!warmupReady) { + log.warn( + `Client ${clientId}: warmup deadline (${args.warmupMs}ms) reached without system_health ready; proceeding anyway`, + ); + } + + // Synchronise round 1 across all clients globally. Without this each + // client enters round 1 as soon as its own warmup completes, drifts on + // round-1 receive timeouts, and senders end up emitting statements for + // round N+k while receivers are still subscribed for round N. + barrier.arrive(); + await barrier.waitStart(); + + const result = await runClient({ + config, + rpc: resources.rpc, + pair, + abortSignal: abortController.signal, + log, + }); + + if (args.failFast && result.failures.length > 0) { + abortController.abort(); + } + + return result; + } catch (e) { + return { + successes: [], + failures: [ + fail(log, clientId, null, FailureKind.TaskPanicked, e?.message ?? String(e)), + ], + }; + } finally { + await resources.cleanup(); + } + })()); + } + + const results = await Promise.all(handles); + const successes = []; + const failures = []; + for (const r of results) { + successes.push(...r.successes); + failures.push(...r.failures); + } + return { successes, failures }; +} + +// Compute [start, end) for worker `i` of `total`, splitting `n` clients +// evenly with the remainder distributed across the first `n % total` workers. +function workerRange(i, total, n) { + const base = Math.floor(n / total); + const rem = n % total; + const start = i * base + Math.min(i, rem); + const size = base + (i < rem ? 1 : 0); + return [start, start + size]; +} + +// Barrier that releases once all `expected` participants have arrived. In +// multi-worker mode, the parent uses one of these to gate the children, and +// each child uses one of these locally over its own clients before reporting +// "ready" up to the parent. +function makeBarrier(expected, onAllArrived) { + let arrived = 0; + let releaseStart; + const startPromise = new Promise((r) => { releaseStart = r; }); + return { + arrive: () => { + arrived += 1; + if (arrived === expected && onAllArrived) onAllArrived(); + }, + waitStart: () => startPromise, + release: () => releaseStart(), + }; +} + +async function runAsParent(args, log) { + const testRunId = randomBytes(8).readBigUInt64LE(0).toString(); + logConfiguration(log, args); + log.info(`Spawning ${args.numClients} client tasks... ${testRunId}`); + + if (args.workers === 1) { + const barrier = makeBarrier(args.numClients, () => { + log.info(`All ${args.numClients} clients ready; starting round 1`); + barrier.release(); + }); + const { successes, failures } = await runWorker({ + args, + clientStart: 0, + clientEnd: args.numClients, + testRunId, + log, + barrier, + }); + reportResults(log, successes, failures, args.numClients, args.numRounds); + process.exit(failures.length > 0 && successes.length === 0 ? 1 : 0); + } + + // Multi-worker: fork one child process per worker. Each child runs its own + // event loop and its own slice of clients in-process. + const selfPath = fileURLToPath(import.meta.url); + const childResults = []; + + const childProcs = []; + const childPromises = []; + // Round-1 barrier across workers: each child sends `worker-ready` once all + // its local clients have warmed up; once every child has reported, the + // parent broadcasts `start` and all clients begin round 1 simultaneously. + const workerBarrier = makeBarrier(args.workers, () => { + log.info(`All ${args.workers} workers ready; starting round 1`); + for (const c of childProcs) c.send({ type: "start" }); + }); + for (let i = 0; i < args.workers; i++) { + const [start, end] = workerRange(i, args.workers, args.numClients); + log.info(`Forking worker ${i + 1}/${args.workers}: clients [${start}, ${end})`); + const child = fork(selfPath, process.argv.slice(2), { + env: { + ...process.env, + BENCH_WORKER_ID: String(i), + BENCH_WORKER_COUNT: String(args.workers), + BENCH_TEST_RUN_ID: testRunId, + BENCH_CLIENT_START: String(start), + BENCH_CLIENT_END: String(end), + }, + stdio: ["inherit", "inherit", "inherit", "ipc"], + }); + childProcs.push(child); + + childPromises.push( + new Promise((resolve, reject) => { + let result = null; + child.on("message", (msg) => { + if (!msg) return; + if (msg.type === "worker-ready") workerBarrier.arrive(); + else if (msg.type === "result") result = msg; + }); + child.on("exit", (code) => { + if (result) { + childResults.push(result); + resolve(); + } else { + reject(new Error(`worker ${i} exited with code ${code} without sending a result`)); + } + }); + child.on("error", reject); + }), + ); + } + + // If any worker fails, kill the rest so they don't outlive the parent + // holding smoldot peers / sockets. + try { + await Promise.all(childPromises); + } catch (e) { + for (const c of childProcs) { + if (c.exitCode === null && c.signalCode === null) { + try { c.kill("SIGTERM"); } catch {} + } + } + await Promise.allSettled(childPromises); + throw e; + } + + const successes = []; + const failures = []; + for (const r of childResults) { + successes.push(...r.successes); + failures.push(...r.failures); + } + reportResults(log, successes, failures, args.numClients, args.numRounds); + process.exit(failures.length > 0 && successes.length === 0 ? 1 : 0); +} + +async function runAsChild(args) { + const workerId = Number.parseInt(process.env.BENCH_WORKER_ID, 10); + const clientStart = Number.parseInt(process.env.BENCH_CLIENT_START, 10); + const clientEnd = Number.parseInt(process.env.BENCH_CLIENT_END, 10); + const testRunId = process.env.BENCH_TEST_RUN_ID; + + const log = makeLogger(args.logLevel, `[w${workerId}] `); + + // Local barrier across this worker's clients: when all have warmed up, tell + // the parent. Then wait for the parent's `start` broadcast (issued once + // every worker has reported ready) and release the local clients. + const localCount = clientEnd - clientStart; + const barrier = makeBarrier(localCount, () => { + process.send({ type: "worker-ready" }); + }); + process.on("message", (msg) => { + if (msg && msg.type === "start") barrier.release(); + }); + + const { successes, failures } = await runWorker({ + args, + clientStart, + clientEnd, + testRunId, + log, + barrier, + }); + + if (typeof process.send === "function") { + await new Promise((resolve) => + process.send({ type: "result", successes, failures }, undefined, undefined, resolve), + ); + } + process.exit(0); +} + +async function main() { + let args; + try { + args = parseFlags(process.argv.slice(2)); + } catch (e) { + process.stderr.write(`Error: ${e.message}\n`); + process.exit(2); + } + + if (process.env.BENCH_WORKER_ID !== undefined) { + await runAsChild(args); + return; + } + + const log = makeLogger(args.logLevel); + await runAsParent(args, log); +} + +main().catch((e) => { + process.stderr.write(`fatal: ${e?.stack ?? e}\n`); + process.exit(1); +}); diff --git a/examples/statement-latency-bench/chainspec.js b/examples/statement-latency-bench/chainspec.js new file mode 100644 index 0000000000..d69d9b66be --- /dev/null +++ b/examples/statement-latency-bench/chainspec.js @@ -0,0 +1,16 @@ +import { readFile } from "node:fs/promises"; + +// Accepts an http(s) URL or a local file path. Bootnodes are expected to be +// embedded in the spec already (the canonical source is the paritytech/chainspecs +// repo and smoldot's bundled demo-chain-specs/, both of which ship bootnodes +// inside the spec). +export async function loadChainSpec(source) { + if (/^https?:\/\//i.test(source)) { + const res = await fetch(source); + if (!res.ok) { + throw new Error(`Failed to fetch ${source}: ${res.status} ${res.statusText}`); + } + return await res.text(); + } + return await readFile(source, "utf8"); +} diff --git a/examples/statement-latency-bench/client.js b/examples/statement-latency-bench/client.js new file mode 100644 index 0000000000..2295b28be5 --- /dev/null +++ b/examples/statement-latency-bench/client.js @@ -0,0 +1,247 @@ +import { blake2AsU8a } from "@polkadot/util-crypto"; +import { u8aToHex } from "@polkadot/util"; +import { encodeStatement, expiryFromParts } from "./statement.js"; +import { FailureKind, fail } from "./stats.js"; + +// BoundedVec> on the statement_subscribeStatement filter side. +const MAX_TOPICS = 128; + +const enc = new TextEncoder(); + +function generateTopic(testRunId, clientId, round, msgIdx) { + const s = `${testRunId}-${clientId}-${round}-${msgIdx}`; + return blake2AsU8a(enc.encode(s), 256); +} + +function u32LeBytes(n) { + const out = new Uint8Array(4); + new DataView(out.buffer).setUint32(0, n >>> 0, true); + return out; +} + +function messagesPerClient(pattern) { + return pattern.reduce((sum, [count]) => sum + count, 0); +} + +function isLeader(clientId) { + return clientId === 0; +} + +function sleep(ms, signal) { + return new Promise((resolve, reject) => { + if (signal?.aborted) return reject(new Error("aborted")); + const t = setTimeout(resolve, ms); + signal?.addEventListener( + "abort", + () => { + clearTimeout(t); + reject(new Error("aborted")); + }, + { once: true }, + ); + }); +} + +function withTimeout(promise, ms) { + let timer; + const timeout = new Promise((_, reject) => { + timer = setTimeout(() => reject(new Error("timeout")), ms); + }); + return Promise.race([promise.then((v) => v), timeout]).finally(() => + clearTimeout(timer), + ); +} + +async function executeRound({ round, config, rpc, pair, log }) { + const { + clientId, + neighbourId, + numRounds, + testRunId, + messagesPattern, + receiveTimeoutMs, + statementExpiryMs, + } = config; + + const expectedCount = messagesPerClient(messagesPattern); + if (expectedCount > MAX_TOPICS) { + return fail( + log, + clientId, + [round, numRounds], + FailureKind.TooManyTopics, + `max ${MAX_TOPICS}, got ${expectedCount}`, + ); + } + + const roundStart = performance.now(); + + const expectedTopics = []; + for (let idx = 0; idx < expectedCount; idx++) { + expectedTopics.push(u8aToHex(generateTopic(testRunId, neighbourId, round, idx))); + } + + let subscription; + try { + subscription = await rpc.subscribe("statement_subscribeStatement", [ + { matchAny: expectedTopics }, + ]); + } catch (e) { + return fail( + log, + clientId, + [round, numRounds], + FailureKind.SubscribeFailed, + e.message, + ); + } + + let sentCount = 0; + try { + for (const [count, size] of messagesPattern) { + for (let i = 0; i < count; i++) { + const topic = generateTopic(testRunId, clientId, round, sentCount); + const channel = blake2AsU8a(u32LeBytes(sentCount), 256); + const expiryTs = Math.floor((Date.now() + statementExpiryMs) / 1000); + const sequence = (sentCount + 1) * round; + const expiry = expiryFromParts(expiryTs, sequence); + const data = new Uint8Array(size); // zero-filled, matching the Rust bench + + const hex = encodeStatement({ pair, expiry, channel, topic, data }); + + let result; + try { + result = await rpc.request("statement_submit", [hex]); + } catch (e) { + return fail( + log, + clientId, + [round, numRounds], + FailureKind.SubmitFailed, + e.message, + ); + } + + sentCount += 1; + if (isLeader(clientId)) { + log.debug( + `Round ${round}/${numRounds}. Sent ${sentCount} statement(s): ${JSON.stringify(result)}`, + ); + } + } + } + + const sendDuration = (performance.now() - roundStart) / 1000; + + let receivedCount = 0; + while (receivedCount < expectedCount) { + let payload; + try { + payload = await withTimeout(subscription.next(), receiveTimeoutMs); + } catch (e) { + return fail( + log, + clientId, + [round, numRounds], + FailureKind.PropagationTimeout, + `received ${receivedCount}/${expectedCount} after ${receiveTimeoutMs}ms`, + ); + } + + if (payload === null) { + return fail( + log, + clientId, + [round, numRounds], + FailureKind.SubscriptionClosed, + `received ${receivedCount}/${expectedCount}`, + ); + } + + if (payload?.event === "newStatements" && Array.isArray(payload.data?.statements)) { + const batch = payload.data.statements.length; + receivedCount += batch; + if (isLeader(clientId)) { + log.debug( + `Round ${round}/${numRounds}. Received ${receivedCount} statement(s) (batch of ${batch})`, + ); + } + } else { + return fail( + log, + clientId, + [round, numRounds], + FailureKind.SubscriptionStreamError, + `received ${receivedCount}/${expectedCount}, unexpected payload: ${JSON.stringify(payload)}`, + ); + } + } + + const fullLatency = (performance.now() - roundStart) / 1000; + const receiveDuration = fullLatency - sendDuration; + + if (isLeader(clientId)) { + log.debug( + `Round ${round}/${numRounds} complete. ` + + `Send: ${sendDuration.toFixed(3)}s, ` + + `Receive: ${receiveDuration.toFixed(3)}s, ` + + `Total: ${fullLatency.toFixed(3)}s`, + ); + } + + return { + round, + sent_count: sentCount, + received_count: receivedCount, + send_duration_secs: sendDuration, + receive_duration_secs: receiveDuration, + full_latency_secs: fullLatency, + }; + } finally { + try { + await rpc.unsubscribe("statement_unsubscribeStatement", subscription.id); + } catch { + // best-effort; if the subscription is already gone we don't care + } + } +} + +export async function runClient({ config, rpc, pair, abortSignal, log }) { + const successes = []; + const failures = []; + + for (let round = 1; round <= config.numRounds; round++) { + if (abortSignal?.aborted) { + failures.push(FailureKind.PeerFailed); + break; + } + + const roundStart = performance.now(); + const result = await executeRound({ round, config, rpc, pair, log }); + + if (typeof result === "string") { + failures.push(result); + if (config.failFast) break; + } else { + successes.push(result); + } + + if (round < config.numRounds) { + const elapsedMs = performance.now() - roundStart; + if (elapsedMs < config.intervalMs) { + try { + await sleep(config.intervalMs - elapsedMs, abortSignal); + } catch { + failures.push(FailureKind.PeerFailed); + break; + } + } else if (isLeader(config.clientId)) { + log.warn( + `Client ${config.clientId}: Round ${round} took longer (${Math.round(elapsedMs)}ms) than target (${config.intervalMs}ms)`, + ); + } + } + } + + return { successes, failures }; +} diff --git a/examples/statement-latency-bench/keypair.js b/examples/statement-latency-bench/keypair.js new file mode 100644 index 0000000000..3b1ac25f0a --- /dev/null +++ b/examples/statement-latency-bench/keypair.js @@ -0,0 +1,19 @@ +import { Keyring } from "@polkadot/keyring"; +import { cryptoWaitReady } from "@polkadot/util-crypto"; + +let keyring = null; + +async function getKeyring() { + if (!keyring) { + await cryptoWaitReady(); + keyring = new Keyring({ type: "sr25519" }); + } + return keyring; +} + +// Mirrors sc_statement_store::test_utils::get_keypair (substrate), +// which derives `//StatementClient//{idx}`. +export async function getKeypair(idx) { + const ring = await getKeyring(); + return ring.createFromUri(`//StatementClient//${idx}`); +} diff --git a/examples/statement-latency-bench/package.json b/examples/statement-latency-bench/package.json new file mode 100644 index 0000000000..ae7cc919d9 --- /dev/null +++ b/examples/statement-latency-bench/package.json @@ -0,0 +1,21 @@ +{ + "name": "statement-latency-bench", + "version": "0.1.0", + "private": true, + "type": "module", + "bin": { + "statement-latency-bench": "./bench.js" + }, + "scripts": { + "start": "node bench.js" + }, + "dependencies": { + "@polkadot/keyring": "^13.0.2", + "@polkadot/util": "^13.0.2", + "@polkadot/util-crypto": "^13.0.2", + "smoldot": "file:../../wasm-node/javascript" + }, + "engines": { + "node": ">=20" + } +} diff --git a/examples/statement-latency-bench/run-local.sh b/examples/statement-latency-bench/run-local.sh new file mode 100755 index 0000000000..5a1477bfab --- /dev/null +++ b/examples/statement-latency-bench/run-local.sh @@ -0,0 +1,167 @@ +#!/usr/bin/env bash +# Local helper: scrape chain specs + bootnodes out of a running zombienet +# (or polkadot-omni-node) network, then run the bench against it. +# Discovers ALL collators (and ALL relay validators) and adds each as a +# bootnode in the chain specs handed to smoldot. +set -euo pipefail + +cd "$(dirname "$0")" + +# Query a node's libp2p PeerId via the legacy `system_localPeerId` JSON-RPC. +peer_id_of() { + local rpc_port="$1" + curl -sS -m 5 -H 'Content-Type: application/json' \ + -d '{"jsonrpc":"2.0","method":"system_localPeerId","params":[],"id":1}' \ + "http://127.0.0.1:${rpc_port}" \ + | jq -r '.result // empty' +} + +# Extract the tcp port from a `--listen-addr` value like +# `/ip4/0.0.0.0/tcp/54115/ws`. +tcp_port_of_listen_addr() { + echo "$1" | sed -n 's,.*tcp/\([0-9][0-9]*\).*,\1,p' +} + +# --- discover running collators ---------------------------------------------- +echo "Looking for running polkadot-parachain / polkadot-omni-node processes ..." +COLLATORS=() +while IFS= read -r line; do + [ -n "$line" ] && COLLATORS+=("$line") +done < <(ps -axww -o command= \ + | grep -E 'polkadot-(parachain|omni-node)' \ + | grep -v grep || true) + +if [ "${#COLLATORS[@]}" -eq 0 ]; then + echo "Error: no polkadot-parachain/omni-node process found." >&2 + echo " Is your zombienet network up?" >&2 + exit 1 +fi + +PARACHAIN_SPEC="" +RELAY_SPEC="" +parachain_bootnodes=() + +for cmd in "${COLLATORS[@]}"; do + parachain_part=$(echo "$cmd" | sed 's/ -- .*//') + relay_part=$(echo "$cmd" | sed 's/.* -- //') + + [ -z "$PARACHAIN_SPEC" ] && \ + PARACHAIN_SPEC=$(echo "$parachain_part" | sed -n 's/.*--chain \([^ ]*\).*/\1/p; q') + [ -z "$RELAY_SPEC" ] && \ + RELAY_SPEC=$(echo "$relay_part" | sed -n 's/.*--chain \([^ ]*\).*/\1/p; q') + + rpc_port=$(echo "$parachain_part" | sed -n 's/.*--rpc-port \([0-9][0-9]*\).*/\1/p') + listen_addr=$(echo "$parachain_part" | sed -n 's/.*--listen-addr \([^ ]*\).*/\1/p') + p2p_port=$(tcp_port_of_listen_addr "$listen_addr") + + if [ -z "$rpc_port" ] || [ -z "$p2p_port" ]; then + echo " skipping collator (no rpc-port or listen-addr): $parachain_part" >&2 + continue + fi + + peer_id=$(peer_id_of "$rpc_port" || true) + if [ -z "$peer_id" ]; then + echo " skipping collator on rpc :$rpc_port (failed to fetch peer id)" >&2 + continue + fi + + parachain_bootnodes+=("/ip4/127.0.0.1/tcp/${p2p_port}/ws/p2p/${peer_id}") +done + +if [ -z "$PARACHAIN_SPEC" ] || [ -z "$RELAY_SPEC" ]; then + echo "Error: failed to extract --chain paths from collator commands." >&2 + exit 1 +fi + +# --- discover running relay validators --------------------------------------- +VALIDATORS=() +while IFS= read -r line; do + [ -n "$line" ] && VALIDATORS+=("$line") +done < <(ps -axww -o command= \ + | awk '/(^|\/)polkadot / && !/polkadot-(parachain|omni-node|prepare-worker|execute-worker)/' \ + | grep -v grep || true) + +relay_bootnodes=() +for cmd in "${VALIDATORS[@]}"; do + rpc_port=$(echo "$cmd" | sed -n 's/.*--rpc-port \([0-9][0-9]*\).*/\1/p') + listen_addr=$(echo "$cmd" | sed -n 's/.*--listen-addr \([^ ]*\).*/\1/p') + p2p_port=$(tcp_port_of_listen_addr "$listen_addr") + + if [ -z "$rpc_port" ] || [ -z "$p2p_port" ]; then + continue + fi + + peer_id=$(peer_id_of "$rpc_port" || true) + if [ -z "$peer_id" ]; then + continue + fi + + relay_bootnodes+=("/ip4/127.0.0.1/tcp/${p2p_port}/ws/p2p/${peer_id}") +done + +echo "Parachain spec : $PARACHAIN_SPEC" +echo "Relay spec : $RELAY_SPEC" +echo "Parachain bootnodes : ${#parachain_bootnodes[@]}" +if [ "${#parachain_bootnodes[@]}" -gt 0 ]; then + for n in "${parachain_bootnodes[@]}"; do echo " - $n"; done +fi +echo "Relay bootnodes : ${#relay_bootnodes[@]}" +if [ "${#relay_bootnodes[@]}" -gt 0 ]; then + for n in "${relay_bootnodes[@]}"; do echo " - $n"; done +fi + +# --- copy + patch specs ------------------------------------------------------ +mkdir -p chain-specs + +if [ "${#parachain_bootnodes[@]}" -gt 0 ]; then + para_bn_json=$(printf '%s\n' "${parachain_bootnodes[@]}" | jq -R . | jq -s .) +else + para_bn_json='[]' +fi +if [ "${#relay_bootnodes[@]}" -gt 0 ]; then + relay_bn_json=$(printf '%s\n' "${relay_bootnodes[@]}" | jq -R . | jq -s .) +else + relay_bn_json='[]' +fi + +# Parachain: id renamed to "parachain" so it's stable across runs (matches +# examples/statement-chat/dev.sh) and discovered bootnodes appended. +jq --argjson bn "$para_bn_json" \ + '.id = "parachain" | .bootNodes = ((.bootNodes // []) + $bn)' \ + "$PARACHAIN_SPEC" > chain-specs/parachain.json +echo "Wrote chain-specs/parachain.json" + +jq --argjson bn "$relay_bn_json" \ + '.bootNodes = ((.bootNodes // []) + $bn)' \ + "$RELAY_SPEC" > chain-specs/relay.json +echo "Wrote chain-specs/relay.json" + +# --- build smoldot if needed -------------------------------------------------- +if [ ! -d ../../wasm-node/javascript/dist ]; then + echo "" + echo "Building smoldot ..." + (cd ../../wasm-node/javascript && npm install && npm run build) +fi + +# --- npm install bench deps --------------------------------------------------- +if [ ! -d node_modules ]; then + echo "" + echo "Installing bench deps ..." + npm install +fi + +# --- run ---------------------------------------------------------------------- +echo "" +echo "Running bench ..." +exec node bench.js \ + --parachain-spec ./chain-specs/parachain.json \ + --relay-chain-spec ./chain-specs/relay.json \ + --num-clients "${NUM_CLIENTS:-4}" \ + --num-rounds "${NUM_ROUNDS:-2}" \ + --messages-pattern "${MESSAGES_PATTERN:-3:128}" \ + --receive-timeout-ms "${RECEIVE_TIMEOUT_MS:-30000}" \ + --interval-ms "${INTERVAL_MS:-5000}" \ + --warmup-ms "${WARMUP_MS:-15000}" \ + --workers "${WORKERS:-1}" \ + --log-level "${LOG_LEVEL:-info}" \ + "$@" diff --git a/examples/statement-latency-bench/smoldot-rpc.js b/examples/statement-latency-bench/smoldot-rpc.js new file mode 100644 index 0000000000..8b49f4d919 --- /dev/null +++ b/examples/statement-latency-bench/smoldot-rpc.js @@ -0,0 +1,113 @@ +// JSON-RPC plumbing for a smoldot Chain. Maintains: +// - request-id counter and pendingRequests Map for request/response correlation +// - per-subscription queues for notifications +// - a single background loop pulling from chain.nextJsonRpcResponse() +// +// Adapted from examples/statement-chat/main.js (PR #3143). +export class SmoldotRpc { + constructor(chain, { onUnexpected } = {}) { + this.chain = chain; + this.nextId = 1; + this.pending = new Map(); + this.subs = new Map(); // subscriptionId -> { queue: [], waiters: [], closed } + this.onUnexpected = onUnexpected ?? (() => {}); + this.stopped = false; + this.loop = this.#run(); + } + + async request(method, params) { + const id = String(this.nextId++); + const body = JSON.stringify({ jsonrpc: "2.0", id, method, params }); + const promise = new Promise((resolve, reject) => { + this.pending.set(id, { resolve, reject }); + }); + this.chain.sendJsonRpc(body); + return promise; + } + + // Subscribe and return { id, next: () => Promise }. + // `next()` resolves with the next notification payload, or `null` once the + // subscription is closed. + async subscribe(subscribeMethod, params) { + const id = await this.request(subscribeMethod, params); + const sub = { queue: [], waiters: [], closed: false }; + this.subs.set(id, sub); + return { + id, + next: () => { + if (sub.queue.length > 0) return Promise.resolve(sub.queue.shift()); + if (sub.closed) return Promise.resolve(null); + return new Promise((resolve) => sub.waiters.push(resolve)); + }, + close: () => this.#closeSub(id), + }; + } + + async unsubscribe(unsubscribeMethod, subscriptionId) { + try { + await this.request(unsubscribeMethod, [subscriptionId]); + } finally { + this.#closeSub(subscriptionId); + } + } + + #closeSub(id) { + const sub = this.subs.get(id); + if (!sub || sub.closed) return; + sub.closed = true; + for (const w of sub.waiters) w(null); + sub.waiters.length = 0; + this.subs.delete(id); + } + + stop() { + this.stopped = true; + for (const { reject } of this.pending.values()) { + reject(new Error("RPC client stopped")); + } + this.pending.clear(); + for (const id of [...this.subs.keys()]) this.#closeSub(id); + } + + async #run() { + while (!this.stopped) { + let raw; + try { + raw = await this.chain.nextJsonRpcResponse(); + } catch (e) { + if (!this.stopped) this.onUnexpected(e); + return; + } + let msg; + try { + msg = JSON.parse(raw); + } catch (e) { + this.onUnexpected(new Error(`bad JSON-RPC payload: ${e.message}`)); + continue; + } + + if (msg.id !== undefined) { + const pending = this.pending.get(msg.id); + if (!pending) { + this.onUnexpected(new Error(`response for unknown id ${msg.id}`)); + continue; + } + this.pending.delete(msg.id); + if (msg.error) pending.reject(new Error(`JSON-RPC error: ${msg.error.message}`)); + else pending.resolve(msg.result); + continue; + } + + if (msg.method && msg.params?.subscription !== undefined) { + const sub = this.subs.get(msg.params.subscription); + if (!sub) continue; // stray notification (likely after unsubscribe) + const payload = msg.params.result; + if (sub.waiters.length > 0) sub.waiters.shift()(payload); + else sub.queue.push(payload); + continue; + } + + this.onUnexpected(new Error(`unrecognized JSON-RPC message: ${raw}`)); + } + } +} diff --git a/examples/statement-latency-bench/statement.js b/examples/statement-latency-bench/statement.js new file mode 100644 index 0000000000..6c0a46a512 --- /dev/null +++ b/examples/statement-latency-bench/statement.js @@ -0,0 +1,91 @@ +import { compactToU8a, u8aConcat, u8aToHex } from "@polkadot/util"; + +// Field discriminants (lib/src/network/codec/statement.rs). +const FIELD_PROOF = 0; +const FIELD_EXPIRY = 2; +const FIELD_CHANNEL = 3; +const FIELD_TOPIC_START = 4; +const FIELD_DATA = 8; + +const PROOF_SR25519 = 0; + +function u64ToLeBytes(n) { + const out = new Uint8Array(8); + const view = new DataView(out.buffer); + view.setBigUint64(0, BigInt(n), true); + return out; +} + +// Pack (timestamp_secs, sequence) into the u64 expiry per +// sp_statement_store::Statement::set_expiry_from_parts: +// expiry = (timestamp_secs as u64) << 32 | sequence as u64 +export function expiryFromParts(timestampSecs, sequence) { + return (BigInt(timestampSecs) << 32n) | BigInt(sequence); +} + +// Build the bytes signed over by the proof, mirroring +// sp_statement_store::Statement::encoded(for_signing=true) in polkadot-sdk: +// no leading field-count compact, no proof field, everything else in ascending +// discriminant order. +function buildSignatureMaterial({ expiry, channel, topic, data }) { + const parts = []; + + parts.push(new Uint8Array([FIELD_EXPIRY])); + parts.push(u64ToLeBytes(expiry)); + + if (channel) { + parts.push(new Uint8Array([FIELD_CHANNEL])); + parts.push(channel); + } + + parts.push(new Uint8Array([FIELD_TOPIC_START])); + parts.push(topic); + + parts.push(new Uint8Array([FIELD_DATA])); + parts.push(compactToU8a(data.length)); + parts.push(data); + + return u8aConcat(...parts); +} + +// Encode a full sr25519-signed statement on the wire. +// +// Wire format (matches lib/src/network/codec/statement.rs and +// sp_statement_store::Statement::encoded(false)): SCALE-compact field count, +// then ascending (discriminant, value) pairs. +export function encodeStatement({ pair, expiry, channel, topic, data }) { + const material = buildSignatureMaterial({ expiry, channel, topic, data }); + const signature = pair.sign(material); + const signer = pair.publicKey; + + let numFields = 1; // proof + numFields += 1; // expiry (always present) + if (channel) numFields += 1; + numFields += 1; // one topic + numFields += 1; // data + + const parts = []; + parts.push(compactToU8a(numFields)); + + parts.push(new Uint8Array([FIELD_PROOF, PROOF_SR25519])); + parts.push(signature); + parts.push(signer); + + parts.push(new Uint8Array([FIELD_EXPIRY])); + parts.push(u64ToLeBytes(expiry)); + + if (channel) { + parts.push(new Uint8Array([FIELD_CHANNEL])); + parts.push(channel); + } + + parts.push(new Uint8Array([FIELD_TOPIC_START])); + parts.push(topic); + + parts.push(new Uint8Array([FIELD_DATA])); + parts.push(compactToU8a(data.length)); + parts.push(data); + + return u8aToHex(u8aConcat(...parts)); +} + diff --git a/examples/statement-latency-bench/stats.js b/examples/statement-latency-bench/stats.js new file mode 100644 index 0000000000..4bd7f0a698 --- /dev/null +++ b/examples/statement-latency-bench/stats.js @@ -0,0 +1,61 @@ +// Closed set of failure categories, mirroring the Rust bench's FailureKind enum. +export const FailureKind = { + TooManyTopics: "Too many topics", + SubscribeFailed: "Failed to open RPC subscription", + SubmitFailed: "Failed to submit statement via RPC", + PropagationTimeout: "Statement propagation timeout", + SubscriptionClosed: "Subscription closed by server", + SubscriptionStreamError: "Subscription stream error", + PeerFailed: "Peer failed; stopping early", + TaskPanicked: "Task panicked", +}; + +// Logs the failure with bench.rs's wording and returns the kind so callers can +// `return fail(...)`. +export function fail(log, clientId, roundInfo, kind, detail) { + const prefix = roundInfo ? `Round ${roundInfo[0]}/${roundInfo[1]}: ` : ""; + log.warn(`Client ${clientId}: ${prefix}${kind} (${detail})`); + return kind; +} + +function calcStats(values) { + const arr = Array.from(values); + const min = arr.reduce((a, b) => Math.min(a, b), Infinity); + const max = arr.reduce((a, b) => Math.max(a, b), -Infinity); + const avg = arr.reduce((a, b) => a + b, 0) / arr.length; + return { min, avg, max }; +} + +function f3(n) { + return n.toFixed(3); +} + +export function reportResults(log, successes, failures, numClients, numRounds) { + if (failures.length > 0) { + const counts = new Map(); + for (const k of failures) counts.set(k, (counts.get(k) ?? 0) + 1); + const sorted = [...counts.entries()].sort((a, b) => b[1] - a[1]); + const errorsStr = sorted.map(([k, c]) => `${k} (${c})`).join("; "); + log.warn( + `Benchmark Failed: failed_clients=${failures.length} total_clients=${numClients} errors=[${errorsStr}]`, + ); + } + + if (successes.length > 0) { + const send = calcStats(successes.map((s) => s.send_duration_secs)); + const receive = calcStats(successes.map((s) => s.receive_duration_secs)); + const latency = calcStats(successes.map((s) => s.full_latency_secs)); + log.info( + `Benchmark Results: ` + + `send_min=${f3(send.min)}s send_avg=${f3(send.avg)}s send_max=${f3(send.max)}s ` + + `receive_min=${f3(receive.min)}s receive_avg=${f3(receive.avg)}s receive_max=${f3(receive.max)}s ` + + `latency_min=${f3(latency.min)}s latency_avg=${f3(latency.avg)}s latency_max=${f3(latency.max)}s`, + ); + } + + const roundsWithAnySuccess = new Set(successes.map((s) => s.round)).size; + log.info( + `Benchmark Finished: rounds_with_any_success=${roundsWithAnySuccess} ` + + `total_rounds=${numRounds} total_clients=${numClients}`, + ); +}