Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@witnet/price-feeds",
"version": "3.1.24",
"version": "3.1.25",
"description": "Open-source collection of verified price data using Witnet on supported blockchains.",
"author": "Witnet Foundation",
"license": "MIT",
Expand Down
251 changes: 130 additions & 121 deletions src/bin/bots/notarizer.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async function main() {
.option(
"--max-threads <number>",
"Max. number of simultaneous dry runs",
process.env.WITNET_PFS_DRY_RUN_MAX_THREADS || 1,
process.env.WITNET_PFS_DRY_RUN_MAX_THREADS || 2,
)
.option(
"--min-balance <wits>",
Expand Down Expand Up @@ -218,139 +218,148 @@ async function main() {
}
}

// prepare and spawn new dry-run subprocess:
const dryRunStart = Date.now();
metrics.dryruns += 1;
console.debug(`[${tag}] Dry-running ${lastDryRunClock ? `after ${commas(dryRunStart - lastDryRunClock)} msecs` : `for the first time`} ...`);
priceFeeds[caption].lastDryRunClock = dryRunStart;
threadBucket.push(
request.execDryRun({ timeout: DRY_RUN_TIMEOUT_SECS * 1000 })
.then(output => {
if (!output || output === "") throw new Error(`no dry-run report`);
else return JSON.parse(output);
})
.then(json => {
// parse dry run result
console.debug(`[${tag}] Dry-run solved in ${commas(Date.now() - dryRunStart)} msecs => ${JSON.stringify(json)}`);
if (!Object.keys(json).includes("RadonInteger")) {
throw `Error: unexpected dry run result: ${JSON.stringify(json).slice(0, 2048)}`;
}
const currentValue = parseInt(json.RadonInteger, 10);

// determine whether a new notarization is required
const heartbeatSecs = Math.floor(Date.now() / 1000) - lastUpdates[caption].timestamp;
if (heartbeatSecs < conditions.heartbeatSecs / 2 + 1) {
const deviation =
lastUpdates[caption].value > 0
? (100 * (currentValue - lastUpdates[caption].value)) /
lastUpdates[caption].value
: 0;
if (Math.abs(deviation) < conditions.deviationPercentage) {
console.info(
`[${tag}] ${deviation >= 0 ? "+" : ""}${deviation.toFixed(2)} % deviation after ${heartbeatSecs} secs.`
)
return;
} else {
console.info(
`[${tag}] Updating due to price deviation of ${deviation.toFixed(2)} % ...`,
);
}
threadBucket.push(new Promise(async (resolve, reject) => {

// determine whether a new notarization is required
const heartbeatSecs = Math.floor(Date.now() / 1000) - lastUpdates[caption].timestamp;
if (heartbeatSecs < conditions.heartbeatSecs / 2 + 1) {
// prepare dry-run subprocess
const dryRunStart = Date.now();
metrics.dryruns += 1;
console.debug(`[${tag}] Dry-running ${lastDryRunClock ? `after ${commas(dryRunStart - lastDryRunClock)} msecs` : `for the first time`} ...`);
priceFeeds[caption].lastDryRunClock = dryRunStart;

// determine current market value
const currentValue = await request.execDryRun({ timeout: DRY_RUN_TIMEOUT_SECS * 1000 })
.then(output => {
if (!output || output === "") throw new Error(`no dry-run report`);
else return JSON.parse(output);
})
.then(json => {
// parse dry run result
console.debug(`[${tag}] Dry-run solved in ${commas(Date.now() - dryRunStart)} msecs => ${JSON.stringify(json)}`);
if (!Object.keys(json).includes("RadonInteger")) {
throw new Error(`unexpected dry run result: ${JSON.stringify(json).slice(0, 2048)}`);
} else {
return parseInt(json.RadonInteger, 10);
}
})
.catch(err => {
console.warn(`[${tag}] ${debug ? `(after ${commas(Date.now() - dryRunStart)} msecs) ` : " "}Dry-run failed: ${err}`);
metrics.errors += 1;
return null;
});

// skip notarization if the dry-run failed
if (currentValue === null) reject();

// compute and evaluate current deviation with respect to last notarized value
const deviation = lastUpdates[caption].value > 0
? (100 * (currentValue - lastUpdates[caption].value)) /
lastUpdates[caption].value
: 0;
if (Math.abs(deviation) < conditions.deviationPercentage) {
console.info(
`[${tag}] ${deviation >= 0 ? "+" : ""}${deviation.toFixed(2)} % deviation after ${heartbeatSecs} secs.`
)
resolve(); // skip notarization
} else {
console.info(
`[${tag}] Updating due to heartbeat after ${heartbeatSecs} secs ...`,
`[${tag}] Updating due to price deviation of ${deviation.toFixed(2)} % ...`,
);
}
} else {
console.info(
`[${tag}] Updating due to heartbeat after ${heartbeatSecs} secs ...`,
);
}

// create and sign and send new data request transaction
console.debug(`[${tag}] Cache info before sending =>`, ledger.cacheInfo);
const DRs = Witnet.DataRequests.from(ledger, request);
metrics.inflight += 1;
priceFeeds[caption].inflight = (priceFeeds[caption].inflight || 0) + 1;

// send data request transaction and wait for the notarization of a new price update
await DRs.sendTransaction({
fees: priority,
witnesses: conditions.minWitnesses,
}).then(tx => {
console.info(`[${tag}] Sending data request transaction => { radHash: ${tx.radHash
} inputs: ${tx.tx?.DataRequest?.signatures.length
} cost: ${Witnet.Coins.fromNanowits(tx.fees.nanowits + tx.value?.nanowits).wits
} weight: ${commas(tx.weight)
} witnesses: ${tx.witnesses
} hash: ${tx.hash
} }`);
metrics.nanowits += tx.fees.nanowits + tx.value?.nanowits;
metrics.queries += 1;
return DRs.confirmTransaction(tx.hash, {
onStatusChange: () => console.info(`[${tag}] DRT status =>`, tx.status),
})

}).then(async tx => {
console.debug(
`[${tag}] Cache info after confirmation =>`,
ledger.cacheInfo,
);

console.debug(`[${tag}] Cache info before sending =>`, ledger.cacheInfo);

// create, sign and send new data request transaction
const DRs = Witnet.DataRequests.from(ledger, request);
metrics.inflight += 1;
priceFeeds[caption].inflight = (priceFeeds[caption].inflight || 0) + 1;

// launch promise for the notarization of new price update
DRs.sendTransaction({
fees: priority,
witnesses: conditions.minWitnesses,
}).then(tx => {
console.info(`[${tag}] Sending data request transaction => { radHash: ${tx.radHash
} inputs: ${tx.tx?.DataRequest?.signatures.length
} cost: ${Witnet.Coins.fromNanowits(tx.fees.nanowits + tx.value?.nanowits).wits
} weight: ${commas(tx.weight)
} witnesses: ${tx.witnesses
} hash: ${tx.hash
} }`);
metrics.nanowits += tx.fees.nanowits + tx.value?.nanowits;
metrics.queries += 1;
return DRs.confirmTransaction(tx.hash, {
onStatusChange: () => console.info(`[${tag}] DRT status =>`, tx.status),
})

}).then(async tx => {
console.debug(
`[${tag}] Cache info after confirmation =>`,
ledger.cacheInfo,
// await resolution in Witnet
let status = tx.status;
do {
const report = await ledger.provider.getDataRequest(
tx.hash,
"ethereal",
);

// await resolution in Witnet
let status = tx.status;
do {
const report = await ledger.provider.getDataRequest(
tx.hash,
"ethereal",
if (report.status !== status) {
status = report.status;
console.info(`[${tag}] DRT status =>`, report.status);
}
if (report.status === "solved" && report?.result) {
const result = utils.cbor.decode(
utils.fromHexString(report.result.cbor_bytes),
);
if (report.status !== status) {
status = report.status;
console.info(`[${tag}] DRT status =>`, report.status);
}
if (report.status === "solved" && report?.result) {
const result = utils.cbor.decode(
utils.fromHexString(report.result.cbor_bytes),
);
if (Number.isInteger(result)) {
lastUpdates[caption].timestamp = report.result.timestamp;
lastUpdates[caption].value = parseInt(result, 10);
const { value, timestamp } = lastUpdates[caption]
const providers = request.sources
.map(source => {
let parts = source.authority.split(".").slice(-2);
parts[0] = parts[0][0].toUpperCase() + parts[0].slice(1);
return parts.join(".")
})
.sort();
console.info(`[${tag}] DRT result => { value: ${value}, ts: ${moment.unix(timestamp).format("MMM Do YYYY HH:mm:ss")}, providers: ${providers.join(" ")} }`);
} else {
throw `Unexpected DRT result => ${result}`;
}
break;
if (Number.isInteger(result)) {
lastUpdates[caption].timestamp = report.result.timestamp;
lastUpdates[caption].value = parseInt(result, 10);
const { value, timestamp } = lastUpdates[caption]
const providers = request.sources
.map(source => {
let parts = source.authority.split(".").slice(-2);
parts[0] = parts[0][0].toUpperCase() + parts[0].slice(1);
return parts.join(".")
})
.sort();
console.info(`[${tag}] DRT result => { value: ${value}, ts: ${moment.unix(timestamp).format("MMM Do YYYY HH:mm:ss")}, providers: ${providers.join(" ")} }`);
} else {
throw `Unexpected DRT result => ${result}`;
}
const delay = (ms) =>
new Promise((_resolve) => setTimeout(_resolve, ms));
await delay(5000);
} while (status !== "solved");

}).then(() => {
priceFeeds[caption].inflight -= 1;
metrics.inflight -= 1;

}).catch(err => {
priceFeeds[caption].inflight -= 1;
metrics.inflight -= 1;
metrics.errors += 1;
console.error(`[${tag}] Notarization failed: ${err}`);
});
})
.catch(err => {
console.warn(`[${tag}] ${debug ? `(after ${commas(Date.now() - dryRunStart)} msecs) ` : " "}Dry-run failed: ${err}`);
metrics.errors += 1;
})
);

break;
}
const delay = (ms) =>
new Promise((_resolve) => setTimeout(_resolve, ms));
await delay(5000);
} while (status !== "solved");

}).then(() => {
priceFeeds[caption].inflight -= 1;
metrics.inflight -= 1;

}).catch(err => {
priceFeeds[caption].inflight -= 1;
metrics.inflight -= 1;
metrics.errors += 1;
console.error(`[${tag}] Notarization failed: ${err}`);
});
resolve();
}));

if (threadBucket.length >= maxThreads) {
await Promise.all(threadBucket)
threadBucket = []
}
}

if (threadBucket.length) {
await Promise.all(threadBucket)
}
Expand Down