From 31354fd9d9df6134bd429084bbb13c75819315ff Mon Sep 17 00:00:00 2001 From: Bjarne Mogstad Date: Wed, 28 Sep 2022 15:14:22 +0200 Subject: [PATCH] Handle close event better The Clarify API node now waits until all of it requests has finished before closing itself. The Insert node sends all buffered data to the insert node. As the API node lives in the global space, it will be closed after the insert node, which means it will live for the lifespan of the request. --- nodes/clarify_api.js | 30 ++++++++++++++++++++++++++---- nodes/clarify_insert.js | 23 ++++++++++++++++++++--- 2 files changed, 46 insertions(+), 7 deletions(-) diff --git a/nodes/clarify_api.js b/nodes/clarify_api.js index 17260c5..b6a1f6c 100644 --- a/nodes/clarify_api.js +++ b/nodes/clarify_api.js @@ -15,6 +15,8 @@ const CredentialSchema = Joi.object({ module.exports = function (RED) { function ClarifyApiNode(config) { RED.nodes.createNode(this, config); + this.requests = new Set(); + if (this.credentials && this.credentials.credentialsFile) { try { let file = JSON.parse(this.credentials.credentialsFile); @@ -28,14 +30,34 @@ module.exports = function (RED) { console.error('Failed creating client: ', error); } } + + this.on('close', done => { + let requests = Array.from(this.requests); + Promise.allSettled(requests).finally(() => { + done(); + }); + }); } - ClarifyApiNode.prototype.saveSignals = function (payload) { - return this.client.saveSignals(payload); + ClarifyApiNode.prototype.handleRequest = async function (request) { + this.requests.add(request); + try { + return await request; + } catch (error) { + return Promise.reject(error); + } finally { + this.requests.delete(request); + } + }; + + ClarifyApiNode.prototype.saveSignals = async function (payload) { + let request = this.client.saveSignals(payload); + return await this.handleRequest(request); }; - ClarifyApiNode.prototype.insert = function (data) { - return this.client.insert(data); + ClarifyApiNode.prototype.insert = async function (data) { + let request = this.client.insert(data); + return await this.handleRequest(request); }; RED.httpAdmin.post('/validateToken', async function (request, response) { diff --git a/nodes/clarify_insert.js b/nodes/clarify_insert.js index 1ae3f3b..cb5fea6 100644 --- a/nodes/clarify_insert.js +++ b/nodes/clarify_insert.js @@ -18,6 +18,9 @@ class DataBuffer { clearTimeout(this.flushTimer); this.flushTimer = null; } + let buffer = [...this.buffer]; + this.buffer = []; + return buffer; } add(data) { @@ -123,9 +126,23 @@ module.exports = function (RED) { this.handleInput(msg, send, done); }); - this.on('close', () => { - this.dataFrameBuffer.cancel(); - this.saveSignalBuffer.cancel(); + this.on('close', done => { + let promises = []; + let dataFrames = this.dataFrameBuffer.cancel(); + let signals = this.saveSignalBuffer.cancel(); + if (dataFrames.length) { + let promise = this.flushDataFramesBuffer(dataFrames); + promises.push(promise); + } + + if (signals.length) { + let promise = this.flushSaveSignalBuffer(data); + promises.push(promise); + } + + Promise.allSettled(promises).finally(() => { + done(); + }); }); }