diff --git a/src/core/workflow/nodes/timer.js b/src/core/workflow/nodes/timer.js index 522057e5..c6118aaa 100644 --- a/src/core/workflow/nodes/timer.js +++ b/src/core/workflow/nodes/timer.js @@ -99,6 +99,38 @@ class TimerSystemTaskNode extends SystemTaskNode { return bag; } + async run({ bag = {}, input = {}, external_input = {}, actor_data = {}, environment = {}, parameters = {} }, lisp) { + const hrt_run_start = process.hrtime(); + try { + const execution_data = this._preProcessing({ bag, input, actor_data, environment, parameters }); + const [result, status] = await this._run(execution_data, lisp); + + let next_node_id = this.next(result); + if (status === ProcessStatus.PENDING) { + next_node_id = this.id; + } + + const hrt_run_interval = process.hrtime(hrt_run_start); + const time_elapsed = Math.ceil(hrt_run_interval[0] * 1000 + hrt_run_interval[1] / 1000000); + const node_extract = this._spec?.extract; + + return { + node_id: this.id, + bag: this._setBag(bag, result, parameters?._extract, node_extract), + external_input: external_input, + result: result, + error: null, + status: status, + next_node_id: next_node_id, + time_elapsed: time_elapsed, + }; + } catch (err) { + const hrt_run_interval = process.hrtime(hrt_run_start); + const time_elapsed = Math.ceil(hrt_run_interval[0] * 1000 + hrt_run_interval[1] / 1000000); + return this._processError(err, { bag, external_input, time_elapsed }); + } + } + // eslint-disable-next-line no-unused-vars async _run(execution_data = {}) { if (!execution_data.timeout) { diff --git a/src/core/workflow/process.js b/src/core/workflow/process.js index c0dfad94..e4548e7e 100644 --- a/src/core/workflow/process.js +++ b/src/core/workflow/process.js @@ -693,7 +693,25 @@ class Process extends PersistedEntity { case ProcessStatus.EXPIRED: break; case ProcessStatus.PENDING: - await this.runPendingProcess(timer.params.actor_data, trx); + const step_number = await this.getNextStepNumber(); + this.state.result.step_number = step_number; + const current_node = this._blueprint.fetchNode(this._state.node_id); + const next_node_id = current_node.next(this.state.result); + this.state = new ProcessState( + this.id, + step_number, + this._state.node_id, + this._state.bag, + null, + this.state.result, + null, + ProcessStatus.RUNNING, + next_node_id, + this._state.actor_data, + null + ); + await this.save(trx); + await this._notifyProcessState({}); break; case ProcessStatus.RUNNING: //TODO: Avaliar como expirar um processo running. diff --git a/src/engine/tests/integration/engine_processes.test.js b/src/engine/tests/integration/engine_processes.test.js index 4b245833..ade945b1 100644 --- a/src/engine/tests/integration/engine_processes.test.js +++ b/src/engine/tests/integration/engine_processes.test.js @@ -406,9 +406,9 @@ test("run process with http retries", async () => { workflow_process = await engine.runProcess(workflow_process.id, actors_.simpleton); - await sleep(5000); + await sleep(7000); - expect(process_state_history).toHaveLength(6); + expect(process_state_history).toHaveLength(8); let http_script = process_state_history[2]; expect(http_script.node_id).toEqual("2"); @@ -420,23 +420,23 @@ test("run process with http retries", async () => { timeout: 1, }); - http_script = process_state_history[3]; + http_script = process_state_history[4]; expect(http_script.node_id).toEqual("2"); expect(http_script.result).toEqual({ attempt: 2, data: "", status: 503, - step_number: 4, + step_number: 5, timeout: 1, }); - http_script = process_state_history[4]; + http_script = process_state_history[6]; expect(http_script.node_id).toEqual("2"); expect(http_script.result).toEqual({ attempt: 3, data: "", status: 503, - step_number: 5, + step_number: 7, timeout: 1, }); } finally {