diff --git a/nodes/clarify_api.js b/nodes/clarify_api.js index 17260c5..b6a1f6c 100644 --- a/nodes/clarify_api.js +++ b/nodes/clarify_api.js @@ -15,6 +15,8 @@ const CredentialSchema = Joi.object({ module.exports = function (RED) { function ClarifyApiNode(config) { RED.nodes.createNode(this, config); + this.requests = new Set(); + if (this.credentials && this.credentials.credentialsFile) { try { let file = JSON.parse(this.credentials.credentialsFile); @@ -28,14 +30,34 @@ module.exports = function (RED) { console.error('Failed creating client: ', error); } } + + this.on('close', done => { + let requests = Array.from(this.requests); + Promise.allSettled(requests).finally(() => { + done(); + }); + }); } - ClarifyApiNode.prototype.saveSignals = function (payload) { - return this.client.saveSignals(payload); + ClarifyApiNode.prototype.handleRequest = async function (request) { + this.requests.add(request); + try { + return await request; + } catch (error) { + return Promise.reject(error); + } finally { + this.requests.delete(request); + } + }; + + ClarifyApiNode.prototype.saveSignals = async function (payload) { + let request = this.client.saveSignals(payload); + return await this.handleRequest(request); }; - ClarifyApiNode.prototype.insert = function (data) { - return this.client.insert(data); + ClarifyApiNode.prototype.insert = async function (data) { + let request = this.client.insert(data); + return await this.handleRequest(request); }; RED.httpAdmin.post('/validateToken', async function (request, response) { diff --git a/nodes/clarify_insert.js b/nodes/clarify_insert.js index 1ae3f3b..cb5fea6 100644 --- a/nodes/clarify_insert.js +++ b/nodes/clarify_insert.js @@ -18,6 +18,9 @@ class DataBuffer { clearTimeout(this.flushTimer); this.flushTimer = null; } + let buffer = [...this.buffer]; + this.buffer = []; + return buffer; } add(data) { @@ -123,9 +126,23 @@ module.exports = function (RED) { this.handleInput(msg, send, done); }); - this.on('close', () => { - this.dataFrameBuffer.cancel(); - this.saveSignalBuffer.cancel(); + this.on('close', done => { + let promises = []; + let dataFrames = this.dataFrameBuffer.cancel(); + let signals = this.saveSignalBuffer.cancel(); + if (dataFrames.length) { + let promise = this.flushDataFramesBuffer(dataFrames); + promises.push(promise); + } + + if (signals.length) { + let promise = this.flushSaveSignalBuffer(data); + promises.push(promise); + } + + Promise.allSettled(promises).finally(() => { + done(); + }); }); }