From 441e1379c542cfb2495b434eaf7ecd3cb32348cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guillermo=20D=C3=ADaz?= Date: Tue, 27 Jan 2026 12:01:22 +0100 Subject: [PATCH 1/2] feat(notarizer): evaluate heartbeat condition before attemptying dry run --- src/bin/bots/notarizer.cjs | 251 +++++++++++++++++++------------------ 1 file changed, 130 insertions(+), 121 deletions(-) diff --git a/src/bin/bots/notarizer.cjs b/src/bin/bots/notarizer.cjs index 61bd25d..45d4435 100644 --- a/src/bin/bots/notarizer.cjs +++ b/src/bin/bots/notarizer.cjs @@ -51,7 +51,7 @@ async function main() { .option( "--max-threads ", "Max. number of simultaneous dry runs", - process.env.WITNET_PFS_DRY_RUN_MAX_THREADS || 1, + process.env.WITNET_PFS_DRY_RUN_MAX_THREADS || 2, ) .option( "--min-balance ", @@ -218,139 +218,148 @@ async function main() { } } - // prepare and spawn new dry-run subprocess: - const dryRunStart = Date.now(); - metrics.dryruns += 1; - console.debug(`[${tag}] Dry-running ${lastDryRunClock ? `after ${commas(dryRunStart - lastDryRunClock)} msecs` : `for the first time`} ...`); - priceFeeds[caption].lastDryRunClock = dryRunStart; - threadBucket.push( - request.execDryRun({ timeout: DRY_RUN_TIMEOUT_SECS * 1000 }) - .then(output => { - if (!output || output === "") throw new Error(`no dry-run report`); - else return JSON.parse(output); - }) - .then(json => { - // parse dry run result - console.debug(`[${tag}] Dry-run solved in ${commas(Date.now() - dryRunStart)} msecs => ${JSON.stringify(json)}`); - if (!Object.keys(json).includes("RadonInteger")) { - throw `Error: unexpected dry run result: ${JSON.stringify(json).slice(0, 2048)}`; - } - const currentValue = parseInt(json.RadonInteger, 10); - - // determine whether a new notarization is required - const heartbeatSecs = Math.floor(Date.now() / 1000) - lastUpdates[caption].timestamp; - if (heartbeatSecs < conditions.heartbeatSecs / 2 + 1) { - const deviation = - lastUpdates[caption].value > 0 - ? (100 * (currentValue - lastUpdates[caption].value)) / - lastUpdates[caption].value - : 0; - if (Math.abs(deviation) < conditions.deviationPercentage) { - console.info( - `[${tag}] ${deviation >= 0 ? "+" : ""}${deviation.toFixed(2)} % deviation after ${heartbeatSecs} secs.` - ) - return; - } else { - console.info( - `[${tag}] Updating due to price deviation of ${deviation.toFixed(2)} % ...`, - ); - } + threadBucket.push(new Promise(async (resolve, reject) => { + + // determine whether a new notarization is required + const heartbeatSecs = Math.floor(Date.now() / 1000) - lastUpdates[caption].timestamp; + if (heartbeatSecs < conditions.heartbeatSecs / 2 + 1) { + // prepare dry-run subprocess + const dryRunStart = Date.now(); + metrics.dryruns += 1; + console.debug(`[${tag}] Dry-running ${lastDryRunClock ? `after ${commas(dryRunStart - lastDryRunClock)} msecs` : `for the first time`} ...`); + priceFeeds[caption].lastDryRunClock = dryRunStart; + + // determine current market value + const currentValue = await request.execDryRun({ timeout: DRY_RUN_TIMEOUT_SECS * 1000 }) + .then(output => { + if (!output || output === "") throw new Error(`no dry-run report`); + else return JSON.parse(output); + }) + .then(json => { + // parse dry run result + console.debug(`[${tag}] Dry-run solved in ${commas(Date.now() - dryRunStart)} msecs => ${JSON.stringify(json)}`); + if (!Object.keys(json).includes("RadonInteger")) { + throw new Error(`unexpected dry run result: ${JSON.stringify(json).slice(0, 2048)}`); + } else { + return parseInt(json.RadonInteger, 10); + } + }) + .catch(err => { + console.warn(`[${tag}] ${debug ? `(after ${commas(Date.now() - dryRunStart)} msecs) ` : " "}Dry-run failed: ${err}`); + metrics.errors += 1; + return null; + }); + + // skip notarization if the dry-run failed + if (currentValue === null) reject(); + + // compute and evaluate current deviation with respect to last notarized value + const deviation = lastUpdates[caption].value > 0 + ? (100 * (currentValue - lastUpdates[caption].value)) / + lastUpdates[caption].value + : 0; + if (Math.abs(deviation) < conditions.deviationPercentage) { + console.info( + `[${tag}] ${deviation >= 0 ? "+" : ""}${deviation.toFixed(2)} % deviation after ${heartbeatSecs} secs.` + ) + resolve(); // skip notarization } else { console.info( - `[${tag}] Updating due to heartbeat after ${heartbeatSecs} secs ...`, + `[${tag}] Updating due to price deviation of ${deviation.toFixed(2)} % ...`, ); } + } else { + console.info( + `[${tag}] Updating due to heartbeat after ${heartbeatSecs} secs ...`, + ); + } + + // create and sign and send new data request transaction + console.debug(`[${tag}] Cache info before sending =>`, ledger.cacheInfo); + const DRs = Witnet.DataRequests.from(ledger, request); + metrics.inflight += 1; + priceFeeds[caption].inflight = (priceFeeds[caption].inflight || 0) + 1; + + // send data request transaction and wait for the notarization of a new price update + await DRs.sendTransaction({ + fees: priority, + witnesses: conditions.minWitnesses, + }).then(tx => { + console.info(`[${tag}] Sending data request transaction => { radHash: ${tx.radHash + } inputs: ${tx.tx?.DataRequest?.signatures.length + } cost: ${Witnet.Coins.fromNanowits(tx.fees.nanowits + tx.value?.nanowits).wits + } weight: ${commas(tx.weight) + } witnesses: ${tx.witnesses + } hash: ${tx.hash + } }`); + metrics.nanowits += tx.fees.nanowits + tx.value?.nanowits; + metrics.queries += 1; + return DRs.confirmTransaction(tx.hash, { + onStatusChange: () => console.info(`[${tag}] DRT status =>`, tx.status), + }) + + }).then(async tx => { + console.debug( + `[${tag}] Cache info after confirmation =>`, + ledger.cacheInfo, + ); - console.debug(`[${tag}] Cache info before sending =>`, ledger.cacheInfo); - - // create, sign and send new data request transaction - const DRs = Witnet.DataRequests.from(ledger, request); - metrics.inflight += 1; - priceFeeds[caption].inflight = (priceFeeds[caption].inflight || 0) + 1; - - // launch promise for the notarization of new price update - DRs.sendTransaction({ - fees: priority, - witnesses: conditions.minWitnesses, - }).then(tx => { - console.info(`[${tag}] Sending data request transaction => { radHash: ${tx.radHash - } inputs: ${tx.tx?.DataRequest?.signatures.length - } cost: ${Witnet.Coins.fromNanowits(tx.fees.nanowits + tx.value?.nanowits).wits - } weight: ${commas(tx.weight) - } witnesses: ${tx.witnesses - } hash: ${tx.hash - } }`); - metrics.nanowits += tx.fees.nanowits + tx.value?.nanowits; - metrics.queries += 1; - return DRs.confirmTransaction(tx.hash, { - onStatusChange: () => console.info(`[${tag}] DRT status =>`, tx.status), - }) - - }).then(async tx => { - console.debug( - `[${tag}] Cache info after confirmation =>`, - ledger.cacheInfo, + // await resolution in Witnet + let status = tx.status; + do { + const report = await ledger.provider.getDataRequest( + tx.hash, + "ethereal", ); - - // await resolution in Witnet - let status = tx.status; - do { - const report = await ledger.provider.getDataRequest( - tx.hash, - "ethereal", + if (report.status !== status) { + status = report.status; + console.info(`[${tag}] DRT status =>`, report.status); + } + if (report.status === "solved" && report?.result) { + const result = utils.cbor.decode( + utils.fromHexString(report.result.cbor_bytes), ); - if (report.status !== status) { - status = report.status; - console.info(`[${tag}] DRT status =>`, report.status); - } - if (report.status === "solved" && report?.result) { - const result = utils.cbor.decode( - utils.fromHexString(report.result.cbor_bytes), - ); - if (Number.isInteger(result)) { - lastUpdates[caption].timestamp = report.result.timestamp; - lastUpdates[caption].value = parseInt(result, 10); - const { value, timestamp } = lastUpdates[caption] - const providers = request.sources - .map(source => { - let parts = source.authority.split(".").slice(-2); - parts[0] = parts[0][0].toUpperCase() + parts[0].slice(1); - return parts.join(".") - }) - .sort(); - console.info(`[${tag}] DRT result => { value: ${value}, ts: ${moment.unix(timestamp).format("MMM Do YYYY HH:mm:ss")}, providers: ${providers.join(" ")} }`); - } else { - throw `Unexpected DRT result => ${result}`; - } - break; + if (Number.isInteger(result)) { + lastUpdates[caption].timestamp = report.result.timestamp; + lastUpdates[caption].value = parseInt(result, 10); + const { value, timestamp } = lastUpdates[caption] + const providers = request.sources + .map(source => { + let parts = source.authority.split(".").slice(-2); + parts[0] = parts[0][0].toUpperCase() + parts[0].slice(1); + return parts.join(".") + }) + .sort(); + console.info(`[${tag}] DRT result => { value: ${value}, ts: ${moment.unix(timestamp).format("MMM Do YYYY HH:mm:ss")}, providers: ${providers.join(" ")} }`); + } else { + throw `Unexpected DRT result => ${result}`; } - const delay = (ms) => - new Promise((_resolve) => setTimeout(_resolve, ms)); - await delay(5000); - } while (status !== "solved"); - - }).then(() => { - priceFeeds[caption].inflight -= 1; - metrics.inflight -= 1; - - }).catch(err => { - priceFeeds[caption].inflight -= 1; - metrics.inflight -= 1; - metrics.errors += 1; - console.error(`[${tag}] Notarization failed: ${err}`); - }); - }) - .catch(err => { - console.warn(`[${tag}] ${debug ? `(after ${commas(Date.now() - dryRunStart)} msecs) ` : " "}Dry-run failed: ${err}`); - metrics.errors += 1; - }) - ); - + break; + } + const delay = (ms) => + new Promise((_resolve) => setTimeout(_resolve, ms)); + await delay(5000); + } while (status !== "solved"); + + }).then(() => { + priceFeeds[caption].inflight -= 1; + metrics.inflight -= 1; + + }).catch(err => { + priceFeeds[caption].inflight -= 1; + metrics.inflight -= 1; + metrics.errors += 1; + console.error(`[${tag}] Notarization failed: ${err}`); + }); + resolve(); + })); + if (threadBucket.length >= maxThreads) { await Promise.all(threadBucket) threadBucket = [] } } + if (threadBucket.length) { await Promise.all(threadBucket) } From 7cd14ea11d3c5eae4c0e18debbdbe66e78c8ccc2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guillermo=20D=C3=ADaz?= Date: Tue, 27 Jan 2026 12:16:20 +0100 Subject: [PATCH 2/2] chore: bump package version --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index e7b6f94..05b0fa6 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@witnet/price-feeds", - "version": "3.1.24", + "version": "3.1.25", "description": "Open-source collection of verified price data using Witnet on supported blockchains.", "author": "Witnet Foundation", "license": "MIT",