diff --git a/.editorconfig b/.editorconfig index d70c2286e..dc8b8f095 100644 --- a/.editorconfig +++ b/.editorconfig @@ -5,7 +5,7 @@ indent_style = space indent_size = 4 trim_trailing_whitespace = true insert_final_newline = true -max_line_length = 120 +max_line_length = 180 end_of_line = lf charset = utf-8 diff --git a/.eslintrc b/.eslintrc index 3563e60e0..ff90a7755 100644 --- a/.eslintrc +++ b/.eslintrc @@ -209,9 +209,9 @@ "max-len": [ "warn", { - "code": 120, + "code": 180, "tabWidth": 4, - "comments": 120, + "comments": 180, "ignoreComments": false, "ignoreTrailingComments": true, "ignoreUrls": true, diff --git a/.vscode/launch.json b/.vscode/launch.json index b0d386ce5..d43e43383 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -31,6 +31,15 @@ "outFiles": [ "${workspaceFolder}/**/*.js" ] + }, + { + "name": "Attach by Process ID", + "processId": "${command:PickProcess}", + "request": "attach", + "skipFiles": [ + "/**" + ], + "type": "node" } ] } diff --git a/bdd/features/e2e/E2E-010-cli.feature b/bdd/features/e2e/E2E-010-cli.feature index 098652ebf..0bde80f0c 100644 --- a/bdd/features/e2e/E2E-010-cli.feature +++ b/bdd/features/e2e/E2E-010-cli.feature @@ -46,6 +46,7 @@ Feature: CLI tests @ci-api @cli Scenario: E2E-010 TC-006 Test Sequence 'prune --force' option + Given I set config for local Hub When I execute CLI with "seq send ../packages/checksum-sequence.tar.gz" When I execute CLI with "seq send ../packages/csv-transform.tar.gz" When I execute CLI with "seq list" diff --git a/bdd/features/e2e/E2E-011-cli-topic.feature b/bdd/features/e2e/E2E-011-cli-topic.feature index e26b0e3ca..b39e3a9a0 100644 --- a/bdd/features/e2e/E2E-011-cli-topic.feature +++ b/bdd/features/e2e/E2E-011-cli-topic.feature @@ -19,6 +19,7 @@ This feature checks topic functionalities over CLI @ci-topic @cli Scenario: E2E-011 TC-003 API to Instance + # Given I set config for local Hub When I execute CLI with "topic send avengers data/data.json" without waiting for the end When I execute CLI with "seq send ../packages/hello-input-out.tar.gz" When I execute CLI with "seq start - --input-topic avengers " diff --git a/bdd/features/e2e/E2E-015-unified.feature b/bdd/features/e2e/E2E-015-unified.feature index 46f9338a1..25c4c9d9e 100644 --- a/bdd/features/e2e/E2E-015-unified.feature +++ b/bdd/features/e2e/E2E-015-unified.feature @@ -23,6 +23,7 @@ Feature: Test our shiny new Python runner Given host is running When find and upload sequence "debug-args.tar.gz" And instance started with arguments "foo 3" + And wait for "1000" ms Then "output" is "{\"first_arg\":\"foo\",\"second_arg\":\"3\"}" And host is still running diff --git a/bdd/step-definitions/e2e/host-steps.ts b/bdd/step-definitions/e2e/host-steps.ts index c9f6fbacb..5fc996f6b 100644 --- a/bdd/step-definitions/e2e/host-steps.ts +++ b/bdd/step-definitions/e2e/host-steps.ts @@ -99,23 +99,42 @@ const waitForProcessToEnd = async (pid: number) => { } }; -const killRunner = async () => { - if (process.env.RUNTIME_ADAPTER === "kubernetes") { - // @TODO - return; +// const killRunner = async () => { +// if (process.env.RUNTIME_ADAPTER === "kubernetes") { +// // @TODO +// return; +// } + +// if (process.env.RUNTIME_ADAPTER === "process" && processId) { +// try { +// process.kill(processId); +// await waitForProcessToEnd(processId); +// } catch (e) { +// console.error("Couldn't kill runner", e); +// } +// } + +// if (process.env.RUNTIME_ADAPTER === "docker" && containerId) { +// await dockerode.getContainer(containerId).kill(); +// } +// }; + +const killAllRunners = async () => { + if (process.env.RUNTIME_ADAPTER === "process") { + exec("killall runner"); } - if (process.env.RUNTIME_ADAPTER === "process" && processId) { - try { - process.kill(processId); - await waitForProcessToEnd(processId); - } catch (e) { - console.error("Couldn't kill runner", e); - } - } + if (process.env.RUNTIME_ADAPTER === "docker") { + await Promise.all( + (await dockerode.listContainers()) + .map(async container => { + if (container.Labels["scramjet.instance.id"]) { + return dockerode.getContainer(container.Id).kill(); + } - if (process.env.RUNTIME_ADAPTER === "docker" && containerId) { - await dockerode.getContainer(containerId).kill(); + return Promise.resolve(); + }) + ); } }; @@ -183,7 +202,20 @@ Before(() => { streams = {}; }); -After({ tags: "@runner-cleanup" }, killRunner); +After({ tags: "@runner-cleanup" }, killAllRunners); +After({}, async () => { + let insts = []; + + try { + insts = await hostClient.listInstances(); + } catch (_e) { + return; + } + + await Promise.all( + insts.map(i => hostClient.getInstanceClient(i.id).kill({ removeImmediately: true }).catch(_e => {})) + ); +}); Before({ tags: "@test-si-init" }, function() { createDirectory("data/template_seq"); @@ -499,29 +531,39 @@ When("send kill message to instance", async function(this: CustomWorld) { assert.ok(resp); }); +// eslint-disable-next-line complexity When("get runner PID", { timeout: 31000 }, async function(this: CustomWorld) { let success: any; let tries = 0; - while (!success && tries < 3) { - if (process.env.RUNTIME_ADAPTER === "kubernetes") { - // @TODO - return; - } - - if (process.env.RUNTIME_ADAPTER === "process") { - const res = (await this.resources.instance?.getHealth())?.processId; - - if (res) { - processId = success = res; - console.log("Process is identified.", processId); - } - } else { - containerId = success = (await this.resources.instance?.getHealth())?.containerId!; + const adapter = process.env.RUNTIME_ADAPTER; - if (containerId) { - console.log("Container is identified.", containerId); - } + while (!success && tries < 3) { + const health = await this.resources.instance?.getHealth(); + + console.log("Health", health); + + switch (adapter) { + case "kubernetes": + return; + case "docker": + + containerId = success = health?.containerId!; + + if (containerId) { + console.log("Container is identified.", containerId); + } + break; + case "process": + const res = health?.processId; + + if (res) { + processId = success = res; + console.log("Process is identified.", processId); + } + break; + default: + break; } tries++; diff --git a/package.json b/package.json index 148726b82..ede9ad7d0 100644 --- a/package.json +++ b/package.json @@ -33,7 +33,7 @@ "lint": "TIMING=1 NODE_OPTIONS=\"--max-old-space-size=2048\" scripts/run-script.js -w modules -j 4 -e \"! ls .eslintrc* > /dev/null || npx eslint ./ --ext .ts --ext .js --cache --cache-strategy=content\"", "lint:uncached": "find . -name .eslintcache -delete && yarn lint", "start": "DEVELOPMENT=true node dist/sth/bin/hub.js", - "start:dev": "DEVELOPMENT=true ts-node packages/sth/src/bin/hub.ts", + "start:dev": "ts-node packages/sth/src/bin/hub.ts", "start:dev:cli": "DEVELOPMENT=true ts-node packages/cli/src/bin/index.ts", "install:clean": "yarn clean && yarn clean:modules && yarn install", "postinstall": "scripts/run-script.js -v -w modules install:deps", diff --git a/packages/adapters/src/docker-instance-adapter.ts b/packages/adapters/src/docker-instance-adapter.ts index fe0d44b66..1f673e6ce 100644 --- a/packages/adapters/src/docker-instance-adapter.ts +++ b/packages/adapters/src/docker-instance-adapter.ts @@ -12,6 +12,7 @@ import { RunnerContainerConfiguration, InstanceLimits, STHConfiguration, + SequenceInfo, } from "@scramjet/types"; import path from "path"; import { DockerodeDockerHelper } from "./dockerode-docker-helper"; @@ -21,6 +22,7 @@ import { STH_DOCKER_NETWORK, isHostSpawnedInDockerContainer, getHostname } from import { ObjLogger } from "@scramjet/obj-logger"; import { getRunnerEnvEntries } from "./get-runner-env"; import { Readable } from "stream"; +import { RunnerConnectInfo } from "@scramjet/types/src/runner-connect"; /** * Adapter for running Instance by Runner executed in Docker container. @@ -114,22 +116,22 @@ IComponent { * @returns {Promise} Promise resolved with container statistics. */ async stats(msg: MonitoringMessageData): Promise { - if (this.resources.containerId) { - const stats = await this.dockerHelper.stats(this.resources.containerId)!; + this.logger.debug("STATS. Container id:", this.resources.containerId); - return { - cpuTotalUsage: stats.cpu_stats?.cpu_usage?.total_usage, - healthy: msg.healthy, - limit: stats.memory_stats?.limit, - memoryMaxUsage: stats.memory_stats?.max_usage, - memoryUsage: stats.memory_stats?.usage, - networkRx: stats.networks?.eth0?.rx_bytes, - networkTx: stats.networks?.eth0?.tx_bytes, - containerId: this.resources.containerId - }; - } + this.resources.containerId ||= await this.dockerHelper.getContainerIdByLabel("scramjet.instance.id", this.id); + + const stats = await this.dockerHelper.stats(this.resources.containerId)!; - return msg; + return { + cpuTotalUsage: stats.cpu_stats?.cpu_usage?.total_usage, + healthy: msg.healthy, + limit: stats.memory_stats?.limit, + memoryMaxUsage: stats.memory_stats?.max_usage, + memoryUsage: stats.memory_stats?.usage, + networkRx: stats.networks?.eth0?.rx_bytes, + networkTx: stats.networks?.eth0?.tx_bytes, + containerId: this.resources.containerId + }; } private async getNetworkSetup(): Promise<{ network: string, host: string }> { @@ -169,8 +171,21 @@ IComponent { }; } + async setRunner(system: Record): Promise { + const containerId = await this.dockerHelper.getContainerIdByLabel("scramjet.instance.id", system.id); + + this.logger.debug("Container id restored", containerId); + + this.resources.containerId = containerId; + } + + async run(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo, payload: RunnerConnectInfo): Promise { + await this.dispatch(config, instancesServerPort, instanceId, sequenceInfo, payload); + return this.waitUntilExit(config, instanceId, sequenceInfo); + } + // eslint-disable-next-line complexity - async run(config: InstanceConfig, instancesServerPort: number, instanceId: string): Promise { + async dispatch(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo, payload: RunnerConnectInfo): Promise { if (!(config.type === "docker" && "container" in config)) { throw new Error("Docker instance adapter run with invalid runner config"); } @@ -193,7 +208,9 @@ IComponent { instancesServerPort, instancesServerHost: networkSetup.host, instanceId, - pipesPath: "" + pipesPath: "", + sequenceInfo, + payload }, { ...this.sthConfig.runnerEnvs }).map(([k, v]) => `${k}=${v}`); @@ -207,7 +224,8 @@ IComponent { { mountPoint: config.sequenceDir, volume: config.id, writeable: false } ], labels: { - "scramjet.sequence.name": config.name + "scramjet.sequence.name": config.name, + "scramjet.instance.id": instanceId }, ports: this.resources.ports, publishAllPorts: true, @@ -220,12 +238,20 @@ IComponent { this.crashLogStreams = Promise.all(([streams.stdout, streams.stderr] as Readable[]).map(streamToString)); - this.resources.containerId = containerId; + this.resources.containerId = containerId; // doesnt matter this.logger.trace("Container is running", containerId); + return 0; + } + + async waitUntilExit(config: InstanceConfig, instanceId:string, _sequenceInfo: SequenceInfo): Promise { try { - const { statusCode } = await this.dockerHelper.wait(containerId); + this.resources.containerId = this.resources.containerId || await this.dockerHelper.getContainerIdByLabel("scramjet.instance.id", instanceId); + + this.logger.debug("Wait for container exit...", this.resources.containerId); + + const { statusCode } = await this.dockerHelper.wait(this.resources.containerId); this.logger.debug("Container exited", statusCode); diff --git a/packages/adapters/src/dockerode-docker-helper.ts b/packages/adapters/src/dockerode-docker-helper.ts index f0c2db067..bf75aaac9 100644 --- a/packages/adapters/src/dockerode-docker-helper.ts +++ b/packages/adapters/src/dockerode-docker-helper.ts @@ -150,6 +150,12 @@ export class DockerodeDockerHelper implements IDockerHelper { return id; } + async getContainerIdByLabel(label: string, value: string): Promise { + const result = await this.dockerode.listContainers({ label: `${label}=${value}` }); + + return result[0]!.Id; + } + /** * Start container with provided id. * diff --git a/packages/adapters/src/get-runner-env.ts b/packages/adapters/src/get-runner-env.ts index e658ac99e..7cdb9cbd6 100644 --- a/packages/adapters/src/get-runner-env.ts +++ b/packages/adapters/src/get-runner-env.ts @@ -9,7 +9,7 @@ import { RunnerEnvConfig, RunnerEnvironmentVariables } from "./types"; * @returns env vars */ export function getRunnerEnvVariables({ - sequencePath, instancesServerPort, instancesServerHost, instanceId, pipesPath, paths = "posix" + sequencePath, instancesServerPort, instancesServerHost, instanceId, pipesPath, paths = "posix", sequenceInfo, payload }: RunnerEnvConfig, extra: Record = {}): RunnerEnvironmentVariables { const join = path[paths].join; @@ -23,6 +23,8 @@ export function getRunnerEnvVariables({ INSTANCE_ID: instanceId, PIPES_LOCATION: pipesPath, CRASH_LOG: join(pipesPath, "crash_log"), + SEQUENCE_INFO: JSON.stringify(sequenceInfo), + RUNNER_CONNECT_INFO: JSON.stringify(payload), ...extra }; } diff --git a/packages/adapters/src/kubernetes-instance-adapter.ts b/packages/adapters/src/kubernetes-instance-adapter.ts index 65887c5ab..7be572307 100644 --- a/packages/adapters/src/kubernetes-instance-adapter.ts +++ b/packages/adapters/src/kubernetes-instance-adapter.ts @@ -9,17 +9,19 @@ import { IObjectLogger, K8SAdapterConfiguration, MonitoringMessageData, + SequenceInfo, STHConfiguration, } from "@scramjet/types"; -import path from "path"; import { ObjLogger } from "@scramjet/obj-logger"; +import { RunnerExitCode } from "@scramjet/symbols"; +import { RunnerConnectInfo } from "@scramjet/types/src/runner-connect"; import { createReadStream } from "fs"; +import path from "path"; +import { PassThrough } from "stream"; +import { getRunnerEnvEntries } from "./get-runner-env"; import { KubernetesClientAdapter } from "./kubernetes-client-adapter"; import { adapterConfigDecoder } from "./kubernetes-config-decoder"; -import { getRunnerEnvEntries } from "./get-runner-env"; -import { PassThrough } from "stream"; -import { RunnerExitCode } from "@scramjet/symbols"; /** * Adapter for running Instance by Runner executed in separate process. @@ -38,6 +40,8 @@ IComponent { private adapterConfig: K8SAdapterConfiguration; private _limits?: InstanceLimits = {}; + stdErrorStream?: PassThrough; + get limits() { return this._limits || {} as InstanceLimits; } private set limits(value: InstanceLimits) { this._limits = value; } @@ -58,7 +62,8 @@ IComponent { private get kubeClient() { if (!this._kubeClient) { - throw new Error("Kubernetes client not initialized"); + this._kubeClient = new KubernetesClientAdapter(this.adapterConfig.authConfigPath, this.adapterConfig.namespace); + this._kubeClient.init(); } return this._kubeClient; @@ -90,8 +95,7 @@ IComponent { } }; } - - async run(config: InstanceConfig, instancesServerPort: number, instanceId: string): Promise { + async dispatch(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo, payload: RunnerConnectInfo): Promise { if (config.type !== "kubernetes") { throw new Error(`Invalid config type for kubernetes adapter: ${config.type}`); } @@ -112,7 +116,9 @@ IComponent { instancesServerPort, instancesServerHost: this.adapterConfig.sthPodHost, instanceId, - pipesPath: "" + pipesPath: "", + sequenceInfo, + payload }, { ...this.sthConfig.runnerEnvs }).map(([name, value]) => ({ name, value })); @@ -143,8 +149,12 @@ IComponent { 2 ); + this.logger.debug("Runner Pod created"); + const startPodStatus = await this.kubeClient.waitForPodStatus(runnerName, ["Running", "Failed"]); + this.logger.debug("Runner Pod status"); + if (startPodStatus.status === "Failed") { this.logger.error("Runner unable to start", startPodStatus); @@ -153,25 +163,35 @@ IComponent { // This means runner pod was unable to start. So it went from "Pending" to "Failed" state directly. // Return 1 which is Linux exit code for "General Error" since we are not able // to determine what happened exactly. - return startPodStatus.code || 137; + return RunnerExitCode.UNCAUGHT_EXCEPTION; } this.logger.debug("Copy sequence files to Runner"); const compressedStream = createReadStream(path.join(config.sequenceDir, "compressed.tar.gz")); - const stdErrorStream = new PassThrough(); - stdErrorStream.on("data", (data) => { this.logger.error("POD stderr", data.toString()); }); + this.stdErrorStream = new PassThrough(); + this.stdErrorStream.on("data", (data) => { this.logger.error("POD stderr", data.toString()); }); + + await this.kubeClient.exec(runnerName, runnerName, ["unpack.sh", "/package"], process.stdout, this.stdErrorStream, compressedStream, 2); + + this.logger.debug("Copy command done"); + + return 0; + } + + async waitUntilExit(_config: InstanceConfig, instanceId: string, _sequenceInfo: SequenceInfo): Promise { + this.logger.info("Waiting for pod exit..."); - await this.kubeClient.exec(runnerName, runnerName, ["unpack.sh", "/package"], process.stdout, stdErrorStream, compressedStream, 2); + this._runnerName ||= `runner-${ instanceId }`; - const exitPodStatus = await this.kubeClient.waitForPodStatus(runnerName, ["Succeeded", "Failed", "Unknown"]); + const exitPodStatus = await this.kubeClient.waitForPodStatus(this._runnerName!, ["Succeeded", "Failed", "Unknown"]); - stdErrorStream.end(); + this.stdErrorStream?.end(); if (exitPodStatus.status !== "Succeeded") { this.logger.error("Runner stopped incorrectly", exitPodStatus); - this.logger.error("Container failure reason is: ", await this.kubeClient.getPodTerminatedContainerReason(runnerName)); + this.logger.error("Container failure reason is: ", await this.kubeClient.getPodTerminatedContainerReason(this._runnerName!)); return exitPodStatus.code || 137; } @@ -184,6 +204,11 @@ IComponent { return 0; } + async run(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo, payload: RunnerConnectInfo): Promise { + await this.dispatch(config, instancesServerPort, instanceId, sequenceInfo, payload); + return this.waitUntilExit(config, instanceId, sequenceInfo); + } + async cleanup(): Promise { await this.remove(this.adapterConfig.timeout); } diff --git a/packages/adapters/src/process-instance-adapter.ts b/packages/adapters/src/process-instance-adapter.ts index b968c3277..9574c8d29 100644 --- a/packages/adapters/src/process-instance-adapter.ts +++ b/packages/adapters/src/process-instance-adapter.ts @@ -1,18 +1,23 @@ import { ObjLogger } from "@scramjet/obj-logger"; -import { streamToString } from "@scramjet/utility"; -import { STHConfiguration, +import { ExitCode, IComponent, ILifeCycleAdapterMain, ILifeCycleAdapterRun, + IObjectLogger, InstanceConfig, InstanceLimits, - IObjectLogger, MonitoringMessageData, - SequenceConfig + STHConfiguration, + SequenceConfig, + SequenceInfo } from "@scramjet/types"; +import { streamToString } from "@scramjet/utility"; import { ChildProcess, spawn } from "child_process"; +import { RunnerConnectInfo } from "@scramjet/types/src/runner-connect"; +import { constants } from "fs"; +import { access, readFile, rm } from "fs/promises"; import path from "path"; import { getRunnerEnvVariables } from "./get-runner-env"; @@ -29,6 +34,10 @@ class ProcessInstanceAdapter implements logger: IObjectLogger; sthConfig: STHConfiguration; + processPID: number = -1; + exitCode = -1; + id?: string | undefined; + private runnerProcess?: ChildProcess; private crashLogStreams?: Promise; private _limits?: InstanceLimits = {}; @@ -47,12 +56,16 @@ class ProcessInstanceAdapter implements async init(): Promise { // noop } + async stats(msg: MonitoringMessageData): Promise { const { runnerProcess } = this; if (!runnerProcess) { // Runner process not initialized yet - return msg; + return { + ...msg, + processId: this.processPID + }; } return { @@ -115,7 +128,17 @@ class ProcessInstanceAdapter implements return pythonpath; } - async run(config: InstanceConfig, instancesServerPort: number, instanceId: string): Promise { + setRunner(system: Record): void { + this.logger.info("Setting system from runner", system); + this.processPID = parseInt(system.processPID, 10); + } + + async run(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo, payload: RunnerConnectInfo): Promise { + await this.dispatch(config, instancesServerPort, instanceId, sequenceInfo, payload); + return this.waitUntilExit(config, instanceId, sequenceInfo); + } + + async dispatch(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo, payload: RunnerConnectInfo): Promise { if (config.type !== "process") { throw new Error("Process instance adapter run with invalid runner config"); } @@ -134,7 +157,9 @@ class ProcessInstanceAdapter implements instancesServerHost: "127.0.0.1", instancesServerPort, instanceId, - pipesPath: "" + pipesPath: "", + sequenceInfo, + payload }, { PYTHONPATH: this.getPythonpath(config.sequenceDir), ...this.sthConfig.runnerEnvs @@ -143,32 +168,100 @@ class ProcessInstanceAdapter implements this.logger.debug("Spawning Runner process with command", runnerCommand); this.logger.trace("Runner process environment", env); - const runnerProcess = spawn(runnerCommand[0], runnerCommand.slice(1), { env }); + const runnerProcess = spawn(runnerCommand[0], runnerCommand.slice(1), { env, detached: true }); + + runnerProcess.unref(); + + runnerProcess.on("exit", (code) => { + this.exitCode = Number(code) || -1; + this.logger.info("Runner exit code", code); + }); this.crashLogStreams = Promise.all([runnerProcess.stdout, runnerProcess.stderr].map(streamToString)); + this.runnerProcess = runnerProcess; + this.logger.trace("Runner process is running", runnerProcess.pid); - this.runnerProcess = runnerProcess; + return 0; + } - const [statusCode, signal] = await new Promise<[number | null, NodeJS.Signals | null]>( - (res) => runnerProcess.on("exit", (code, sig) => res([code, sig])) - ); + getRunnerInfo(): RunnerConnectInfo["system"] { + return { + processPID: this.processPID.toString() + }; + } - this.logger.trace("Runner process exited", runnerProcess.pid); + async waitUntilExit(_config: InstanceConfig, _instanceId: string, _sequenceInfo: SequenceInfo): Promise { + if (this.runnerProcess) { + const [statusCode, signal] = await new Promise<[number | null, NodeJS.Signals | null]>( + (res) => { + if (this.exitCode > -1) { + res([this.exitCode, null]); + } - if (statusCode === null) { - this.logger.warn("Runner was killed by a signal, and didn't return a status code", signal); + this.runnerProcess?.on("exit", (code, sig) => res([code, sig])); + } + ); - // Probably SIGIKLL - return 137; - } + this.logger.trace("Runner process exited", this.runnerProcess?.pid); + + if (statusCode === null) { + this.logger.warn("Runner was killed by a signal, and didn't return a status code", signal); + + // Probably SIGIKLL + return 137; + } - if (statusCode > 0) { - this.logger.debug("Process returned non-zero status code", statusCode); + if (statusCode > 0) { + this.logger.debug("Process returned non-zero status code", statusCode); + } + + return statusCode; } - return statusCode; + // When no process reference Wait for file created by runner + return new Promise((res, reject) => { + const interval = setInterval(async () => { + if (this.processPID < 1) return; + + const filePath = `/tmp/runner-${this.processPID}`; + + try { + await access(filePath, constants.F_OK); + + clearInterval(interval); + + const data = await readFile(filePath, "utf8").catch((readErr) => { + this.logger.error(`Cant' read runner exit code from: ${readErr}`); + reject(readErr); + return; + }); + + this.logger.debug("exitCode saved to file by runner:", data, filePath); + + rm(filePath).then(() => { + this.logger.debug("File removed"); + }, (err: any) => { + this.logger.error("Can't remove exitcode file", err); + }); + + res(parseInt(data!, 10)); + } catch (err) { + /** OK. file not exists. check if process is*/ + + try { + process.kill(this.processPID, 0); + } catch (e) { + this.logger.error("Runner process not exists", e); + + clearInterval(interval); + + reject("pid not exists"); + } + } + }, 1000); + }); } /** @@ -188,7 +281,11 @@ class ProcessInstanceAdapter implements * Forcefully stops Runner process. */ async remove() { - this.runnerProcess?.kill(); + if (this.runnerProcess) { + this.runnerProcess.kill(); + } else { + spawn("kill", ["-9", this.processPID.toString()]); + } } async getCrashLog(): Promise { diff --git a/packages/adapters/src/types.ts b/packages/adapters/src/types.ts index 71ffcec83..d140c2b22 100644 --- a/packages/adapters/src/types.ts +++ b/packages/adapters/src/types.ts @@ -1,4 +1,5 @@ -import { ExitCode, InstanceId, IObjectLogger } from "@scramjet/types"; +import { ExitCode, InstanceId, IObjectLogger, SequenceInfo } from "@scramjet/types"; +import { StartSequencePayload } from "@scramjet/types/src/rest-api-sth"; import { ContainerStats, NetworkInspectInfo } from "dockerode"; import { PathLike } from "fs"; import { Stream, Writable } from "stream"; @@ -189,6 +190,14 @@ export interface IDockerHelper { */ translateVolumesConfig: (volumeConfigs: DockerAdapterVolumeConfig[]) => any; + /** + * Gets first found container by a given label + * + * @param {string} label the label + * @param {string} value label value. + */ + getContainerIdByLabel(label: string, value: string): Promise; + /** * Creates Docker container from provided image with attached volumes and local directories. * @@ -317,6 +326,8 @@ export type RunnerEnvConfig = { instancesServerPort: number; instancesServerHost: string; instanceId: InstanceId; + sequenceInfo: SequenceInfo + payload: StartSequencePayload } export type RunnerEnvironmentVariables = Partial<{ diff --git a/packages/host/src/lib/cpm-connector.ts b/packages/host/src/lib/cpm-connector.ts index 549fa8742..2a4ad8e9a 100644 --- a/packages/host/src/lib/cpm-connector.ts +++ b/packages/host/src/lib/cpm-connector.ts @@ -2,7 +2,7 @@ import fs from "fs"; import { Readable } from "stream"; import * as http from "http"; -import { CPMMessageCode, InstanceMessageCode, SequenceMessageCode } from "@scramjet/symbols"; +import { CPMMessageCode, SequenceMessageCode } from "@scramjet/symbols"; import { STHRestAPI, CPMConnectorOptions, @@ -291,7 +291,7 @@ export class CPMConnector extends TypedEmitter { return message; }).catch((e: any) => { - this.logger.error("communicationChannel error", e.message); + this.logger.warn("communicationChannel error", e.message); }); this.communicationStream = new StringStream().JSONStringify().resume(); @@ -386,7 +386,7 @@ export class CPMConnector extends TypedEmitter { }); this.verserClient.once("error", async (error: any) => { - this.logger.error("VerserClient error", error); + this.logger.warn("VerserClient error", error); try { await this.reconnect(); @@ -408,9 +408,7 @@ export class CPMConnector extends TypedEmitter { this.connection?.removeAllListeners(); this.connected = false; - this.logger.trace("Tunnel closed", this.getId()); - - this.logger.info("CPM connection closed."); + this.logger.info("CPM connection closed.", connectionStatusCode, this.getId()); if (this.loadInterval) { clearInterval(this.loadInterval); @@ -584,13 +582,12 @@ export class CPMConnector extends TypedEmitter { * @param {string} instance Instance details. * @param {SequenceMessageCode} instanceStatus Instance status. */ - async sendInstanceInfo(instance: Instance, instanceStatus: InstanceMessageCode): Promise { - this.logger.trace("Send instance status update", instanceStatus); + async sendInstanceInfo(instance: Instance): Promise { + this.logger.trace("Send instance status update", instance.status); + await this.communicationStream?.whenWrote( - [CPMMessageCode.INSTANCE, { instance, status: instanceStatus }] + [CPMMessageCode.INSTANCE, { instance }] ); - - this.logger.trace("Instance status update sent", instanceStatus); } /** diff --git a/packages/host/src/lib/csi-controller.ts b/packages/host/src/lib/csi-controller.ts index cc4451882..7bff15a3d 100644 --- a/packages/host/src/lib/csi-controller.ts +++ b/packages/host/src/lib/csi-controller.ts @@ -1,50 +1,51 @@ +import { + AppError, + CSIControllerError, + HostError, + InstanceAdapterError, + MessageUtilities +} from "@scramjet/model"; +import { development } from "@scramjet/sth-config"; + import { APIRoute, AppConfig, DownstreamStreamsConfig, EncodedMessage, + EventMessageData, HandshakeAcknowledgeMessage, + HostProxy, ICommunicationHandler, + ILifeCycleAdapterRun, + InstanceLimits, + InstanceStats, + IObjectLogger, + MessageDataType, + MonitoringMessageData, + OpResponse, ParsedMessage, PassThroughStreamsConfig, ReadableStream, SequenceInfo, - WritableStream, - InstanceConfig, - ILifeCycleAdapterRun, - MessageDataType, - IObjectLogger, - STHRestAPI, STHConfiguration, - InstanceLimits, - MonitoringMessageData, - InstanceStats, - OpResponse, + STHRestAPI, StopSequenceMessageData, - HostProxy, - EventMessageData, + WritableStream } from "@scramjet/types"; -import { - AppError, - CSIControllerError, - CommunicationHandler, - HostError, - MessageUtilities, - InstanceAdapterError, -} from "@scramjet/model"; -import { CommunicationChannel as CC, InstanceStatus, RunnerExitCode, RunnerMessageCode } from "@scramjet/symbols"; +import { CommunicationChannel as CC, InstanceStatus, RunnerMessageCode } from "@scramjet/symbols"; import { Duplex, PassThrough, Readable } from "stream"; -import { development } from "@scramjet/sth-config"; -import { DataStream } from "scramjet"; +import { DuplexStream, getRouter } from "@scramjet/api-server"; import { EventEmitter, once } from "events"; import { ServerResponse } from "http"; -import { DuplexStream, getRouter } from "@scramjet/api-server"; +import { DataStream } from "scramjet"; import { getInstanceAdapter } from "@scramjet/adapters"; -import { cancellableDefer, CancellablePromise, defer, promiseTimeout, TypedEmitter } from "@scramjet/utility"; import { ObjLogger } from "@scramjet/obj-logger"; +import { RunnerConnectInfo } from "@scramjet/types/src/runner-connect"; +import { cancellableDefer, CancellablePromise, defer, promiseTimeout, TypedEmitter } from "@scramjet/utility"; import { ReasonPhrases } from "http-status-codes"; +import { mapRunnerExitCode } from "./utils"; /** * @TODO: Runner exits after 10secs and k8s client checks status every 500ms so we need to give it some time @@ -54,6 +55,7 @@ import { ReasonPhrases } from "http-status-codes"; const runnerExitDelay = 15000; type Events = { + ping: (pingMessage: MessageDataType) => void; pang: (payload: MessageDataType) => void; event: (payload: EventMessageData) => void; hourChime: () => void; @@ -65,8 +67,11 @@ type Events = { const BPMux = require("bpmux").BPMux; +export type CSIControllerInfo = { ports?: any; created?: Date; started?: Date; ended?: Date; }; /** * Handles all Instance lifecycle, exposes instance's HTTP API. + * + * @todo write interface for CSIController and CSIDispatcher */ export class CSIController extends TypedEmitter { id: string; @@ -76,7 +81,6 @@ export class CSIController extends TypedEmitter { private keepAliveRequested?: boolean; private _lastStats?: MonitoringMessageData; private bpmux: any; - private adapter: string; get lastStats(): InstanceStats { return { @@ -88,16 +92,15 @@ export class CSIController extends TypedEmitter { } }; } - hostProxy: HostProxy; - sthConfig: STHConfiguration; limits: InstanceLimits = {}; + runnerSystemInfo: RunnerConnectInfo["system"]; sequence: SequenceInfo; appConfig: AppConfig; instancePromise?: Promise<{ message: string, exitcode: number; status: InstanceStatus }>; args: Array | undefined; controlDataStream?: DataStream; router?: APIRoute; - info: { ports?: any; created?: Date; started?: Date; ended?: Date; } = {}; + info: CSIControllerInfo = {}; status: InstanceStatus; terminated?: { exitcode: number; reason: string; }; provides?: string; @@ -113,6 +116,9 @@ export class CSIController extends TypedEmitter { apiOutput = new PassThrough(); apiInputEnabled = true; + executionTime: number = -1; + inputHeadersSent = false; + /** * Topic to which the output stream should be routed */ @@ -166,44 +172,34 @@ export class CSIController extends TypedEmitter { public localEmitter: EventEmitter & { lastEvents: { [evname: string]: any } }; - communicationHandler: ICommunicationHandler; - constructor( - id: string, - sequence: SequenceInfo, - payload: STHRestAPI.StartSequencePayload, - communicationHandler: CommunicationHandler, - sthConfig: STHConfiguration, - hostProxy: HostProxy, - chosenAdapter: STHConfiguration["runtimeAdapter"] = sthConfig.runtimeAdapter + private handshakeMessage: Omit, "created">, + public communicationHandler: ICommunicationHandler, + private sthConfig: STHConfiguration, + private hostProxy: HostProxy, + private adapter: STHConfiguration["runtimeAdapter"] = sthConfig.runtimeAdapter ) { super(); - - this.id = id; - this.adapter = chosenAdapter; - this.sequence = sequence; - this.appConfig = payload.appConfig; - this.sthConfig = sthConfig; - this.args = payload.args; - this.outputTopic = payload.outputTopic; - this.inputTopic = payload.inputTopic; - this.hostProxy = hostProxy; + this.id = this.handshakeMessage.id; + this.runnerSystemInfo = this.handshakeMessage.payload.system; + this.sequence = this.handshakeMessage.sequenceInfo; + this.appConfig = this.handshakeMessage.payload.appConfig; + this.args = this.handshakeMessage.payload.args; + this.outputTopic = this.handshakeMessage.payload.outputTopic; + this.inputTopic = this.handshakeMessage.payload.inputTopic; this.limits = { - memory: payload.limits?.memory || sthConfig.docker.runner.maxMem, - gpu: payload.limits?.gpu + memory: handshakeMessage.payload.limits?.memory || sthConfig.docker.runner.maxMem, + gpu: handshakeMessage.payload.limits?.gpu }; this.instanceLifetimeExtensionDelay = +sthConfig.timings.instanceLifetimeExtensionDelay; - this.communicationHandler = communicationHandler; - this.logger = new ObjLogger(this, { id }); + this.logger = new ObjLogger(this, { id: this.id }); this.localEmitter = Object.assign( new EventEmitter(), { lastEvents: {} } ); - this.logger.debug("Constructor executed"); - this.info.created = new Date(); this.status = InstanceStatus.INITIALIZING; this.upStreams = [ @@ -218,15 +214,19 @@ export class CSIController extends TypedEmitter { ]; } - async start() { - const i = new Promise((res, rej) => { + async start(): Promise { + const i = new Promise((res, rej) => { this.initResolver = { res, rej }; this.startInstance(); }); i.then(() => this.main()).catch(async (e) => { this.logger.info("Instance status: errored", e); + this.status ||= InstanceStatus.ERRORED; + + this.executionTime = this.info.created ? (Date.now() - this.info.created!.getTime()) / 1000 : -1; + this.setExitInfo(e.exitcode, e.message); this.emit("error", e); @@ -242,7 +242,7 @@ export class CSIController extends TypedEmitter { async main() { this.status = InstanceStatus.RUNNING; - this.logger.trace("Instance started"); + this.logger.trace("Main. Current status:", this.status); let code = -1; @@ -268,6 +268,8 @@ export class CSIController extends TypedEmitter { } this.info.ended = new Date(); + this.executionTime = (this.info.ended.getTime() - this.info.created!.getTime()) / 1000; + this.emit("terminated", code); this.logger.trace("Finalizing..."); @@ -282,30 +284,21 @@ export class CSIController extends TypedEmitter { this._instanceAdapter.logger.pipe(this.logger, { end: false }); - const instanceConfig: InstanceConfig = { - ...this.sequence.config, - limits: this.limits, - instanceAdapterExitDelay: this.sthConfig.timings.instanceAdapterExitDelay - }; + this.endOfSequence = this._instanceAdapter.waitUntilExit(undefined, this.id, this.sequence); + // @todo this also is moved to CSIDispatcher in entirety const instanceMain = async () => { try { this.status = InstanceStatus.STARTING; - await this.instanceAdapter.init(); - this.logger.trace("Streams hooked and routed"); - this.endOfSequence = this.instanceAdapter.run( - instanceConfig, - this.sthConfig.host.instancesServerPort, - this.id - ); - this.logger.trace("Sequence initialized"); const exitcode = await this.endOfSequence; + this.logger.trace("End of sequence"); + if (exitcode > 0) { this.status = InstanceStatus.ERRORED; this.logger.error("Crashlog", await this.instanceAdapter.getCrashLog()); @@ -325,7 +318,7 @@ export class CSIController extends TypedEmitter { }; this.instancePromise = instanceMain() - .then((exitcode) => this.mapRunnerExitCode(exitcode)) + .then((exitcode) => mapRunnerExitCode(exitcode, this.sequence)) .catch((error) => { this.logger.error("Instance promise rejected", error); this.initResolver?.rej(error); @@ -333,6 +326,7 @@ export class CSIController extends TypedEmitter { return error; }); + // @todo - this should be checked by CSIController, but Dispatcher should know about this via event listener. this.instancePromise.finally(() => { this.heartBeatResolver?.res(this.id); }).catch(() => 0); @@ -345,74 +339,6 @@ export class CSIController extends TypedEmitter { }); } - // eslint-disable-next-line complexity - private mapRunnerExitCode(exitcode: number): Promise< - { message: string, exitcode: number, status: InstanceStatus } - > { - // eslint-disable-next-line default-case - switch (exitcode) { - case RunnerExitCode.INVALID_ENV_VARS: { - return Promise.reject({ - message: "Runner was started with invalid configuration. This is probably a bug in STH.", - exitcode: RunnerExitCode.INVALID_ENV_VARS, - status: InstanceStatus.ERRORED - }); - } - case RunnerExitCode.PODS_LIMIT_REACHED: { - return Promise.reject({ - message: "Instance limit reached", - exitcode: RunnerExitCode.PODS_LIMIT_REACHED, - status: InstanceStatus.ERRORED - }); - } - case RunnerExitCode.INVALID_SEQUENCE_PATH: { - return Promise.reject({ - message: `Sequence entrypoint path ${this.sequence.config.entrypointPath} is invalid. ` + - "Check `main` field in Sequence package.json", - exitcode: RunnerExitCode.INVALID_SEQUENCE_PATH, - status: InstanceStatus.ERRORED - }); - } - case RunnerExitCode.SEQUENCE_FAILED_ON_START: { - return Promise.reject({ - message: "Sequence failed on start", - exitcode: RunnerExitCode.SEQUENCE_FAILED_ON_START, - status: InstanceStatus.ERRORED - }); - } - case RunnerExitCode.SEQUENCE_FAILED_DURING_EXECUTION: { - return Promise.reject({ - message: "Sequence failed during execution", - exitcode: RunnerExitCode.SEQUENCE_FAILED_DURING_EXECUTION, - status: InstanceStatus.ERRORED - }); - } - case RunnerExitCode.SEQUENCE_UNPACK_FAILED: { - return Promise.reject({ - message: "Sequence unpack failed", - exitcode: RunnerExitCode.SEQUENCE_UNPACK_FAILED, - status: InstanceStatus.ERRORED - }); - } - case RunnerExitCode.KILLED: { - return Promise.resolve({ - message: "Instance killed", exitcode: RunnerExitCode.KILLED, status: InstanceStatus.COMPLETED - }); - } - case RunnerExitCode.STOPPED: { - return Promise.resolve({ - message: "Instance stopped", exitcode: RunnerExitCode.STOPPED, status: InstanceStatus.COMPLETED - }); - } - } - - if (exitcode > 0) { - return Promise.reject({ message: "Runner failed", exitcode, status: InstanceStatus.ERRORED }); - } - - return Promise.resolve({ message: "Instance completed", exitcode, status: InstanceStatus.COMPLETED }); - } - async cleanup() { await this.instanceAdapter.cleanup(); @@ -480,8 +406,27 @@ export class CSIController extends TypedEmitter { .pipe(this.upStreams[CC.CONTROL]); this.communicationHandler.addMonitoringHandler(RunnerMessageCode.PING, async (message) => { + const { status, payload, inputHeadersSent } = message[1]; + + this.status = status || InstanceStatus.RUNNING; + this.inputHeadersSent = inputHeadersSent; + + if (!payload) { + this.emit("error", "No payload in ping!"); + + return null; + } + + this.args = payload.args; + this.info.created = new Date(message[1].created); + + this.provides ||= this.outputTopic || payload?.outputTopic; + this.requires ||= this.inputTopic || payload?.inputTopic; + await this.handleHandshake(message); + this.emit("ping", message[1]); + return null; }); @@ -503,6 +448,10 @@ export class CSIController extends TypedEmitter { }); this.communicationHandler.addMonitoringHandler(RunnerMessageCode.MONITORING, async message => { + await this.controlDataStream?.whenWrote( + MessageUtilities.serializeMessage({ msgCode: RunnerMessageCode.MONITORING_REPLY }) + ); + const stats = await this.instanceAdapter.stats(message[1]); this._lastStats = stats; @@ -510,6 +459,7 @@ export class CSIController extends TypedEmitter { this.heartBeatTick(); message[1] = stats; + return message; }, true); @@ -531,20 +481,38 @@ export class CSIController extends TypedEmitter { this.upStreams[CC.MONITORING].resume(); } + // TODO: refactor out of CSI Controller - this should be in async handleHandshake(message: EncodedMessage) { - this.logger.debug("PING received", message); + this.logger.debug("PING received", JSON.stringify(message)); + + if (message[1].ports) { + this.logger.trace("Received a PING message with ports config"); + } + + this.inputHeadersSent = !!message[1].inputHeadersSent; + + this.logger.info("Headers already sent for input?", this.inputHeadersSent); - if (!message[1].ports) { - this.logger.trace("Received a PING message but didn't receive ports config"); + if (this.instanceAdapter.setRunner) { + await this.instanceAdapter.setRunner({ + ...message[1].payload.system, + id: this.id + }); } this.info.ports = message[1].ports; + this.sequence = message[1].sequenceInfo; + + this.inputTopic = message[1].payload?.inputTopic; + this.outputTopic = message[1].payload?.outputTopic; + // TODO: add message to initiate the instance adapter if (this.controlDataStream) { const pongMsg: HandshakeAcknowledgeMessage = { msgCode: RunnerMessageCode.PONG, appConfig: this.appConfig, - args: this.args + args: this.args, + //runtimeId:? }; await this.controlDataStream.whenWrote(MessageUtilities.serializeMessage(pongMsg)); @@ -552,10 +520,11 @@ export class CSIController extends TypedEmitter { throw new CSIControllerError("UNINITIALIZED_STREAM", "control"); } - this.info.started = new Date(); - this.logger.info("Instance started", this.info); + this.info.started = new Date(); //@TODO: set by runner? + this.logger.info("Handshake", JSON.stringify(message, undefined)); } + //@TODO: ! unhookup ! set proper state for reconnecting ! async handleInstanceConnect(streams: DownstreamStreamsConfig) { try { this.hookupStreams(streams); @@ -567,6 +536,7 @@ export class CSIController extends TypedEmitter { streams[8]?.end(); }); this.bpmux.on("peer_multiplex", (socket: Duplex, _data: any) => this.hostProxy.onInstanceRequest(socket)); + await once(this, "pang"); this.initResolver?.res(); } catch (e: any) { @@ -575,8 +545,6 @@ export class CSIController extends TypedEmitter { } createInstanceAPIRouter() { - let inputHeadersSent = false; - if (!this.upStreams) { throw new AppError("UNATTACHED_STREAMS"); } @@ -589,11 +557,11 @@ export class CSIController extends TypedEmitter { * @experimental */ this.router.duplex("/inout", (duplex, _headers) => { - if (!inputHeadersSent) { + if (!this.inputHeadersSent) { this.downStreams![CC.IN].write(`Content-Type: ${_headers["content-type"]}\r\n`); this.downStreams![CC.IN].write("\r\n"); - inputHeadersSent = true; + this.inputHeadersSent = true; } (duplex as unknown as DuplexStream).input.pipe(this.downStreams![CC.IN], { end: false }); @@ -635,7 +603,7 @@ export class CSIController extends TypedEmitter { const contentType = req.headers["content-type"]; // @TODO: Check if subsequent requests have the same content-type. - if (!inputHeadersSent) { + if (!this.inputHeadersSent) { if (contentType === undefined) { return { opStatus: ReasonPhrases.NOT_ACCEPTABLE, error: "Content-Type must be defined" }; } @@ -643,7 +611,7 @@ export class CSIController extends TypedEmitter { stream.write(`Content-Type: ${contentType}\r\n`); stream.write("\r\n"); - inputHeadersSent = true; + this.inputHeadersSent = true; } return stream; @@ -812,6 +780,8 @@ export class CSIController extends TypedEmitter { } getInfo(): STHRestAPI.GetInstanceResponse { + this.logger.debug("Get info [seq, info]", this.sequence, this.info); + return { id: this.id, appConfig: this.appConfig, diff --git a/packages/host/src/lib/csi-dispatcher.ts b/packages/host/src/lib/csi-dispatcher.ts new file mode 100644 index 000000000..dafc5a344 --- /dev/null +++ b/packages/host/src/lib/csi-dispatcher.ts @@ -0,0 +1,293 @@ +import { getInstanceAdapter } from "@scramjet/adapters"; +import { IDProvider } from "@scramjet/model"; +import { ObjLogger } from "@scramjet/obj-logger"; +import { InstanceStatus, RunnerMessageCode } from "@scramjet/symbols"; +import { ContentType, EventMessageData, HostProxy, ICommunicationHandler, IObjectLogger, Instance, InstanceConfig, MessageDataType, PangMessageData, PingMessageData, STHConfiguration, STHRestAPI, SequenceInfo, SequenceInfoInstance } from "@scramjet/types"; +import { TypedEmitter } from "@scramjet/utility"; +import { CSIController, CSIControllerInfo } from "./csi-controller"; +import { InstanceStore } from "./instance-store"; +import { ServiceDiscovery } from "./serviceDiscovery/sd-adapter"; +import TopicId from "./serviceDiscovery/topicId"; +import { Readable, Writable } from "stream"; +import SequenceStore from "./sequenceStore"; +import { mapRunnerExitCode } from "./utils"; + +export type DispatcherErrorEventData = { id:string, err: any }; +export type DispatcherInstanceEndEventData = { id: string, code: number, info: CSIControllerInfo & { executionTime: number }, sequence: SequenceInfoInstance}; +export type DispatcherInstanceTerminatedEventData = DispatcherInstanceEndEventData; +export type DispatcherInstanceEstablishedEventData = Instance; +export type DispatcherChimeEvent = { id: string, language: string, seqId: string }; + +type Events = { + pang: (payload: MessageDataType) => void; + hourChime: (data: DispatcherChimeEvent) => void; + error: (data: DispatcherErrorEventData) => void; + stop: (code: number) => void; + end: (data: DispatcherInstanceEndEventData) => void; + terminated: (data: DispatcherInstanceEndEventData) => void; + established: (data: DispatcherInstanceEstablishedEventData) => void; + event: (eventData: { event: EventMessageData, id: string }) => void; +}; + +type CSIDispatcherOpts = { + instanceStore: typeof InstanceStore, + sequenceStore: SequenceStore, + serviceDiscovery: ServiceDiscovery, + STHConfig: STHConfiguration +} + +export class CSIDispatcher extends TypedEmitter { + public logger: IObjectLogger; + public instanceStore: typeof InstanceStore; + public sequenceStore: SequenceStore; + private STHConfig: STHConfiguration; + private serviceDiscovery: ServiceDiscovery; + + constructor(opts: CSIDispatcherOpts) { + super(); + + this.logger = new ObjLogger(this); + this.instanceStore = opts.instanceStore; + this.sequenceStore = opts.sequenceStore; + this.STHConfig = opts.STHConfig; + this.serviceDiscovery = opts.serviceDiscovery; + } + + async createCSIController( + id: string, + sequenceInfo: SequenceInfo, + payload: STHRestAPI.StartSequencePayload, + communicationHandler: ICommunicationHandler, + config: STHConfiguration, + instanceProxy: HostProxy) { + sequenceInfo.instances = sequenceInfo.instances || []; + + const csiController = new CSIController({ + id, + sequenceInfo, + payload, + status: InstanceStatus.INITIALIZING, + inputHeadersSent: false + }, communicationHandler, config, instanceProxy, this.STHConfig.runtimeAdapter); + + this.logger.trace("CSIController created", id, sequenceInfo); + + csiController.logger.pipe(this.logger, { end: false }); + + communicationHandler.logger.pipe(this.logger, { end: false }); + + csiController + .on("error", (err) => { + this.logger.error("CSIController errored", err.message, err.exitcode); + this.emit("error", { id, err }); + }) + .on("event", async (event: EventMessageData) => { + this.logger.info("Received event", event); + this.emit("event", { event, id: csiController.id }); + }) + .on("hourChime", () => { + this.emit("hourChime", { + id: csiController.id, + language: csiController.sequence.config.language, + seqId: csiController.sequence.id + }); + }) + + // eslint-disable-next-line complexity + .on("pang", async (data: PangMessageData) => { + this.logger.trace("PANG received", [csiController.id, data]); + + if ((data.requires || data.provides) && !data.contentType) { + this.logger.warn("Missing topic content-type"); + } + + if (data.requires && data.contentType) { + this.logger.trace("Routing topic to Instance input", data.requires); + + await this.serviceDiscovery.routeTopicToStream( + { topic: new TopicId(data.requires), contentType: data.contentType as ContentType }, + csiController.getInputStream() + ); + + csiController.inputHeadersSent = true; + + await this.serviceDiscovery.update({ + requires: data.requires, contentType: data.contentType, topicName: data.requires, status: "add" + }); + } + + if (data.provides && !csiController.outputRouted && data.contentType) { + this.logger.trace("Routing Sequence output to topic", data.provides); + + await this.serviceDiscovery.routeStreamToTopic( + csiController.getOutputStream(), + { topic: new TopicId(data.provides), contentType: data.contentType as ContentType } + ); + + csiController.outputRouted = true; + + await this.serviceDiscovery.update({ + localProvider: csiController.id, provides: data.provides, contentType: data.contentType!, topicName: data.provides, status: "add" + }); + } + }) + .on("ping", (pingMessage: PingMessageData) => { + this.logger.info("Ping received", JSON.stringify(pingMessage)); + + if (pingMessage.sequenceInfo.config.type !== this.STHConfig.runtimeAdapter) { + this.logger.error("Incorrect Instance adapter"); + + return; + } + + const seq = this.sequenceStore.getById(csiController.sequence.id); + + if (seq) { + seq.instances.push(csiController.id); + } else { + this.logger.warn("Instance of not existing sequence connected"); + //@TODO: ? + } + + this.emit("established", { id: pingMessage.id, sequence: pingMessage.sequenceInfo }); + }) + .on("end", async (code: number) => { + this.logger.trace("csiControllerontrolled ended", `id: ${csiController.id}`, `Exit code: ${code}`); + + if (csiController.provides && csiController.provides !== "") { + csiController.getOutputStream().unpipe(this.serviceDiscovery.getData( + { + topic: new TopicId(csiController.provides), + contentType: "" as ContentType + } + ) as Writable); + } + + csiController.logger.unpipe(this.logger); + + this.emit("end", { + id, + code, + info: { + executionTime: csiController.executionTime + }, + sequence: csiController.sequence + }); + + const seq = this.sequenceStore.getById(csiController.sequence.id); + + if (seq) { + seq.instances = seq.instances.filter(i => i !== csiController.id); + } + + delete this.instanceStore[csiController.id]; + }) + .once("terminated", (code) => { + if (csiController.requires && csiController.requires !== "") { + (this.serviceDiscovery.getData({ + topic: new TopicId(csiController.requires), + contentType: "" as ContentType, + }) as Readable + ).unpipe(csiController.getInputStream()!); + } + + this.emit("terminated", { + id, + code, + info: { + executionTime: csiController.executionTime + }, + sequence: csiController.sequence + }); + }); + + csiController.start().catch((e) => { + this.logger.error("CSIC start error", csiController.id, e); + this.emit("error", { id: csiController.id, err: "fatal" }); + }); + + this.logger.trace("csiController started", id); + + this.instanceStore[id] = csiController; + + return csiController; + } + + async startRunner(sequence: SequenceInfo, payload: STHRestAPI.StartSequencePayload) { + this.logger.debug("Preparing Runner..."); + + const limits = { + memory: payload.limits?.memory || this.STHConfig.docker.runner.maxMem + }; + const id = payload.instanceId || IDProvider.generate(); + + const instanceAdapter = getInstanceAdapter(this.STHConfig.runtimeAdapter, this.STHConfig, id); + const instanceConfig: InstanceConfig = { + ...sequence.config, + limits, + instanceAdapterExitDelay: this.STHConfig.timings.instanceAdapterExitDelay + }; + + instanceAdapter.logger.pipe(this.logger); + + this.logger.debug("Initializing Adapter..."); + + await instanceAdapter.init(); + + this.logger.debug("Dispatching..."); + + const dispatchResultCode = await instanceAdapter.dispatch( + instanceConfig, + this.STHConfig.host.instancesServerPort, + id, + sequence, + payload + ); + + if (dispatchResultCode !== 0) { + this.logger.warn("Dispatch result code:", dispatchResultCode); + throw await mapRunnerExitCode(dispatchResultCode, sequence); + } + + this.logger.debug("Dispatched. Waiting for connection...", id); + + let established = false; + + return await Promise.race([ + new Promise((resolve, _reject) => { + const resolveFunction = (instance: Instance) => { + if (instance.id === id) { + this.logger.debug("Established", id); + + this.off("established", resolveFunction); + established = true; + resolve(); + } + }; + + this.on("established", resolveFunction); + }).then(() => ({ + id, + appConfig: payload.appConfig, + args: payload.args, + sequenceId: sequence.id, + info: {}, + limits, + sequence + })), + // handle fast fail - before connection is established. + Promise.resolve().then( + () => instanceAdapter.waitUntilExit(undefined, id, sequence) + .then(async (exitCode: number) => { + if (!established) { + this.logger.info("Exited before established", id, exitCode); + + return mapRunnerExitCode(exitCode, sequence); + } + + return undefined; + }) + ) + ]); + } +} diff --git a/packages/host/src/lib/host.ts b/packages/host/src/lib/host.ts index e5fc5f9e5..48288fdcb 100644 --- a/packages/host/src/lib/host.ts +++ b/packages/host/src/lib/host.ts @@ -1,62 +1,69 @@ import findPackage from "find-package-json"; import { ReasonPhrases, StatusCodes } from "http-status-codes"; -import { Duplex } from "stream"; import { IncomingHttpHeaders, IncomingMessage, Server, ServerResponse } from "http"; import { AddressInfo } from "net"; +import { Duplex } from "stream"; +import { CommunicationHandler, HostError, IDProvider } from "@scramjet/model"; +import { HostHeaders, InstanceMessageCode, InstanceStatus, RunnerMessageCode, SequenceMessageCode } from "@scramjet/symbols"; import { APIExpose, - ContentType, CPMConnectorOptions, EventMessageData, HostProxy, IComponent, IMonitoringServerConstructor, IObjectLogger, + Instance, LogLevel, MonitoringServerConfig, NextCallback, OpResponse, ParsedMessage, PublicSTHConfiguration, - SequenceInfo, - StartSequenceDTO, STHConfiguration, STHRestAPI, + SequenceInfo, + StartSequenceDTO } from "@scramjet/types"; -import { CommunicationHandler, HostError, IDProvider } from "@scramjet/model"; -import { HostHeaders, InstanceMessageCode, RunnerMessageCode, SequenceMessageCode } from "@scramjet/symbols"; -import { ObjLogger, prettyPrint } from "@scramjet/obj-logger"; -import { LoadCheck, LoadCheckConfig } from "@scramjet/load-check"; import { getSequenceAdapter, initializeRuntimeAdapters } from "@scramjet/adapters"; +import { LoadCheck, LoadCheckConfig } from "@scramjet/load-check"; +import { ObjLogger, prettyPrint } from "@scramjet/obj-logger"; -import { CPMConnector } from "./cpm-connector"; -import { CSIController } from "./csi-controller"; import { CommonLogsPipe } from "./common-logs-pipe"; +import { CPMConnector } from "./cpm-connector"; import { InstanceStore } from "./instance-store"; -import { ServiceDiscovery } from "./serviceDiscovery/sd-adapter"; -import { SocketServer } from "./socket-server"; -import { DataStream } from "scramjet"; -import { optionsMiddleware } from "./middlewares/options"; -import { corsMiddleware } from "./middlewares/cors"; +import { DuplexStream } from "@scramjet/api-server"; import { ConfigService, development } from "@scramjet/sth-config"; import { isStartSequenceDTO, isStartSequenceEndpointPayloadDTO, readJsonFile, defer, FileBuilder } from "@scramjet/utility"; + +import { DataStream } from "scramjet"; import { inspect } from "util"; -import { auditMiddleware, logger as auditMiddlewareLogger } from "./middlewares/audit"; + import { AuditedRequest, Auditor } from "./auditor"; +import { auditMiddleware, logger as auditMiddlewareLogger } from "./middlewares/audit"; +import { corsMiddleware } from "./middlewares/cors"; +import { optionsMiddleware } from "./middlewares/options"; + +import { ServiceDiscovery } from "./serviceDiscovery/sd-adapter"; +import { SocketServer } from "./socket-server"; + import { getTelemetryAdapter, ITelemetryAdapter } from "@scramjet/telemetry"; import { cpus, homedir, totalmem } from "os"; import { S3Client } from "./s3-client"; -import { DuplexStream } from "@scramjet/api-server"; + import { existsSync, mkdirSync, readFileSync } from "fs"; -import TopicId from "./serviceDiscovery/topicId"; import TopicRouter from "./serviceDiscovery/topicRouter"; + import SequenceStore from "./sequenceStore"; -import { GetSequenceResponse } from "@scramjet/types/src/rest-api-sth"; + import { loadModule, logger as loadModuleLogger } from "@scramjet/module-loader"; + +import { CSIDispatcher, DispatcherChimeEvent as DispatcherChimeEventData, DispatcherErrorEventData, DispatcherInstanceEndEventData, DispatcherInstanceEstablishedEventData, DispatcherInstanceTerminatedEventData } from "./csi-dispatcher"; + import { parse } from "path"; const buildInfo = readJsonFile("build.info", __dirname, ".."); @@ -153,23 +160,12 @@ export class Host implements IComponent { */ s3Client?: S3Client; + csiDispatcher: CSIDispatcher; + private instanceProxy: HostProxy = { onInstanceRequest: (socket: Duplex) => { this.api.server.emit("connection", socket); }, }; - /** - * Sets listener for connections to socket server. - */ - private attachListeners() { - this.socketServer.on("connect", async (id, streams) => { - this.logger.debug("Instance connected", id); - - await this.instancesStore[id].handleInstanceConnect( - streams - ); - }); - } - public get service(): string { return name; } @@ -200,6 +196,7 @@ export class Host implements IComponent { constructor(apiServer: APIExpose, socketServer: SocketServer, sthConfig: STHConfiguration) { this.config = sthConfig; this.publicConfig = ConfigService.getConfigInfo(sthConfig); + this.sequenceStore = new SequenceStore(); this.logger = new ObjLogger( this, @@ -269,6 +266,17 @@ export class Host implements IComponent { this.instanceBase = `${this.config.host.apiBase}/instance`; this.topicsBase = `${this.config.host.apiBase}/topic`; + this.csiDispatcher = new CSIDispatcher({ + instanceStore: this.instancesStore, + sequenceStore: this.sequenceStore, + serviceDiscovery: this.serviceDiscovery, + STHConfig: sthConfig + }); + + this.csiDispatcher.logger.pipe(this.logger); + + this.attachDispatcherEvents(); + if (this.config.host.apiBase.includes(":")) { throw new HostError("API_CONFIGURATION_ERROR", "Can't expose an API on paths including a semicolon..."); } @@ -299,6 +307,102 @@ export class Host implements IComponent { return monitoringServer.start(); } + attachDispatcherEvents() { + this.csiDispatcher + .on("event", async ({ event, id }) => { + await this.eventBus({ source: id, ...event }); + }) + .on("end", async (eventData: DispatcherInstanceEndEventData) => { + await this.handleDispatcherEndEvent(eventData); + }) + .on("established", async (instance: Instance) => { + await this.handleDispatcherEstablishedEvent(instance); + }) + .on("terminated", async (eventData: DispatcherInstanceTerminatedEventData) => { + await this.handleDispatcherTerminatedEvent(eventData); + }) + .on("error", (errorData: DispatcherErrorEventData) => { + this.pushTelemetry("Instance error", { ...errorData }, "error"); + }) + .on("hourChime", (data: DispatcherChimeEventData) => { + this.pushTelemetry("Instance hour chime", data); + }); + } + + /** + * Check for Sequence. + * Pass information about connected instance to monitoring and platform services. + * + * @param {DispatcherInstanceEstablishedEventData} instance Instance data. + */ + async handleDispatcherEstablishedEvent(instance: DispatcherInstanceEstablishedEventData) { + this.logger.info("Checking Sequence..."); + + const seq = this.sequenceStore.getById(instance.sequence.id); + + if (!seq && this.cpmConnector?.connected) { + this.logger.info("Sequence not found. Checking Store..."); + + try { + const extSeq = await this.getExternalSequence(instance.sequence.id); + + this.logger.info("Sequence acquired.", extSeq); + } catch (e) { + this.logger.warn("Sequence not found in Store. Instance has no Sequence."); + } + } + + this.auditor.auditInstance(instance.id, InstanceMessageCode.INSTANCE_CONNECTED); + + await this.cpmConnector?.sendInstanceInfo({ + id: instance.id, + sequence: instance.sequence + }); + + this.pushTelemetry("Instance connected", { + id: instance.id, + seqId: instance.sequence.id + }); + } + + /** + * Pass information about ended instance to monitoring and platform services. + * + * @param {DispatcherInstanceEndEventData} instance Event details. + */ + async handleDispatcherEndEvent(instance: DispatcherInstanceEndEventData) { + this.auditor.auditInstance(instance.id, InstanceMessageCode.INSTANCE_ENDED); + + await this.cpmConnector?.sendInstanceInfo({ + id: instance.id, + status: InstanceStatus.GONE, + sequence: instance.sequence + }); + + this.pushTelemetry("Instance ended", { + executionTime: instance.info.executionTime.toString(), + id: instance.id, + code: instance.code.toString(), + seqId: instance.sequence.id + }); + } + + /** + * Pass information about terminated instance to monitoring services. + * + * @param {DispatcherInstanceTerminatedEventData} eventData Event details. + */ + async handleDispatcherTerminatedEvent(eventData: DispatcherInstanceTerminatedEventData) { + this.auditor.auditInstance(eventData.id, InstanceMessageCode.INSTANCE_TERMINATED); + + this.pushTelemetry("Instance terminated", { + executionTime: eventData.info.executionTime.toString(), + id: eventData.id, + code: eventData.code.toString(), + seqId: eventData.sequence.id + }); + } + getId() { let id = this.config.host.id; @@ -483,11 +587,12 @@ export class Host implements IComponent { return; } - await this.startCSIController(sequence, { + await this.csiDispatcher.startRunner(sequence, { appConfig: seqenceConfig.appConfig || {}, args: seqenceConfig.args, instanceId: seqenceConfig.instanceId }); + this.logger.debug("Starting sequence based on config", seqenceConfig); }) .run(); @@ -504,7 +609,7 @@ export class Host implements IComponent { connector.init(); connector.on("connect", async () => { - await connector.sendSequencesInfo(this.getSequences()); + await connector.sendSequencesInfo(this.getSequences().map(s => ({ ...s, status: SequenceMessageCode.SEQUENCE_CREATED }))); await connector.sendInstancesInfo(this.getInstances()); await connector.sendTopicsInfo(this.getTopics()); @@ -674,6 +779,8 @@ export class Host implements IComponent { error: `The sequence ${id} does not exist.` }; } + // eslint-disable-next-line no-console + this.logger.info("Instances of sequence", sequence.id, sequence.instances); if (sequence.instances.length > 0) { const instances = [...sequence.instances].every((instanceId) => { @@ -708,8 +815,9 @@ export class Host implements IComponent { this.sequenceStore.delete(id); this.logger.trace("Sequence removed:", id); - // eslint-disable-next-line max-len - await this.cpmConnector?.sendSequenceInfo(id, SequenceMessageCode.SEQUENCE_DELETED, sequence as unknown as GetSequenceResponse); + + await this.cpmConnector?.sendSequenceInfo(id, SequenceMessageCode.SEQUENCE_DELETED, sequence as unknown as STHRestAPI.GetSequenceResponse); + this.auditor.auditSequence(id, SequenceMessageCode.SEQUENCE_DELETED); return { @@ -766,6 +874,8 @@ export class Host implements IComponent { * Used to recover Sequences information after restart. */ async identifyExistingSequences() { + this.logger.trace("Identifing existing sequences"); + const adapter = await initializeRuntimeAdapters(this.config); const sequenceAdapter = getSequenceAdapter(adapter, this.config); @@ -836,7 +946,7 @@ export class Host implements IComponent { this.logger.trace(`Sequence identified: ${config.id}`); // eslint-disable-next-line max-len - await this.cpmConnector?.sendSequenceInfo(id, SequenceMessageCode.SEQUENCE_CREATED, config as unknown as GetSequenceResponse); + await this.cpmConnector?.sendSequenceInfo(id, SequenceMessageCode.SEQUENCE_CREATED, config as unknown as STHRestAPI.GetSequenceResponse); this.auditor.auditSequence(id, SequenceMessageCode.SEQUENCE_CREATED); this.pushTelemetry("Sequence uploaded", { language: config.language.toLowerCase(), seqId: id }); @@ -904,6 +1014,8 @@ export class Host implements IComponent { } async getExternalSequence(id: string): Promise { + this.logger.info("Requesting Sequence from external source"); + let packageStream: IncomingMessage | undefined; try { @@ -927,7 +1039,7 @@ export class Host implements IComponent { return this.sequenceStore.getById(result.id)!; } catch (e: any) { - this.logger.error("Error requesting sequence", e.message); + this.logger.warn("Can't aquire Sequence from external source", e.message); throw new Error(ReasonPhrases.NOT_FOUND); } @@ -945,78 +1057,66 @@ export class Host implements IComponent { */ // eslint-disable-next-line complexity async handleStartSequence(req: ParsedMessage): Promise> { - try { - if (await this.loadCheck.overloaded()) { - return { - opStatus: ReasonPhrases.INSUFFICIENT_SPACE_ON_RESOURCE, - }; - } - - const sequenceId = req.params?.id as string; - const payload = req.body || ({} as STHRestAPI.StartSequencePayload); + if (await this.loadCheck.overloaded()) { + return { + opStatus: ReasonPhrases.INSUFFICIENT_SPACE_ON_RESOURCE, + }; + } - if (payload.instanceId) { - if (!isStartSequenceEndpointPayloadDTO(payload)) { - return { opStatus: ReasonPhrases.UNPROCESSABLE_ENTITY, error: "Invalid Instance id" }; - } + const sequenceId = req.params?.id as string; + const payload = req.body || ({} as STHRestAPI.StartSequencePayload); - if (this.instancesStore[payload.instanceId]) { - return { - opStatus: ReasonPhrases.CONFLICT, - error: "Instance with a given ID already exists" - }; - } + if (payload.instanceId) { + if (!isStartSequenceEndpointPayloadDTO(payload)) { + return { opStatus: ReasonPhrases.UNPROCESSABLE_ENTITY, error: "Invalid Instance id" }; } - let sequence = this.sequenceStore.getByNameOrId(sequenceId); - - if (this.cpmConnector?.connected) { - sequence ||= await this.getExternalSequence(sequenceId).catch((error: ReasonPhrases) => { - this.logger.error("Error getting sequence from external sources", error); - return undefined; - }); + if (this.instancesStore[payload.instanceId]) { + return { + opStatus: ReasonPhrases.CONFLICT, + error: "Instance with a given ID already exists" + }; } + } - if (!sequence) { - return { opStatus: ReasonPhrases.NOT_FOUND }; - } + let sequence = this.sequenceStore.getByNameOrId(sequenceId); - this.logger.info("Start sequence", sequence.id, sequence.config.name); + if (this.cpmConnector?.connected) { + sequence ||= await this.getExternalSequence(sequenceId).catch((error: ReasonPhrases) => { + this.logger.error("Error getting sequence from external sources", error); - const csic = await this.startCSIController(sequence, payload); + return undefined; + }); + } - await this.cpmConnector?.sendInstanceInfo({ - id: csic.id, - appConfig: csic.appConfig, - args: csic.args, - sequence: (info => { - // eslint-disable-next-line @typescript-eslint/no-unused-vars - const { instances, ...rest } = info; + if (!sequence) { + return { opStatus: ReasonPhrases.NOT_FOUND }; + } - return rest; - })(sequence), - ports: csic.info.ports, - created: csic.info.created, - started: csic.info.started, - status: csic.status, - }, InstanceMessageCode.INSTANCE_STARTED); + this.logger.info("Start sequence", sequence.id, sequence.config.name); - this.logger.debug("Instance limits", csic.limits); - this.auditor.auditInstanceStart(csic.id, req as AuditedRequest, csic.limits); - this.pushTelemetry("Instance started", { id: csic.id, language: csic.sequence.config.language, seqId: csic.sequence.id }); + try { + const runner = await this.csiDispatcher.startRunner(sequence, payload); - csic.on("hourChime", () => { - this.pushTelemetry("Instance hour chime", { id: csic.id, language: csic.sequence.config.language, seqId: csic.sequence.id }); - }); + if (runner && "id" in runner) { + this.logger.debug("Instance limits", runner.limits); + this.auditor.auditInstanceStart(runner.id, req as AuditedRequest, runner.limits); + this.pushTelemetry("Instance started", { id: runner.id, language: runner.sequence.config.language, seqId: runner.sequence.id }); - return { - opStatus: ReasonPhrases.OK, - message: `Sequence ${csic.id} starting`, - id: csic.id - }; + return { + opStatus: ReasonPhrases.OK, + message: `Sequence ${runner.id} starting`, + id: runner.id + }; + } else if (runner) { + throw runner; + } + + throw Error("Unexpected startup error"); } catch (error: any) { this.pushTelemetry("Instance start failed", { error: error.message }, "error"); this.logger.error(error.message); + return { opStatus: ReasonPhrases.BAD_REQUEST, error: error.message @@ -1025,139 +1125,28 @@ export class Host implements IComponent { } /** - * Creates new CSIController {@link CSIController} object and handles its events. - * - * @param {SequenceInfo} sequence Sequence info object. - * @param {STHRestAPI.StartSequencePayload} payload App start configuration. + * Sets listener for connections to socket server. */ - async startCSIController(sequence: SequenceInfo, payload: STHRestAPI.StartSequencePayload): Promise { - const communicationHandler = new CommunicationHandler(); - const id = payload.instanceId || IDProvider.generate(); - - if (isDevelopment) this.logger.debug("CSIC start payload", payload); - - const csic = new CSIController(id, sequence, payload, communicationHandler, this.config, this.instanceProxy); - - csic.logger.pipe(this.logger, { end: false }); - communicationHandler.logger.pipe(this.logger, { end: false }); - - this.logger.trace("CSIController created", id); - - this.instancesStore[id] = csic; - - csic.on("event", async (event: EventMessageData) => { - await this.eventBus({ source: id, ...event }); - }); - csic.on("error", (err) => { - this.pushTelemetry("Instance error", { ...err }, "error"); - this.logger.error("CSIController errored", err.message, err.exitcode); - }); - - // eslint-disable-next-line complexity - csic.on("pang", async (data) => { - this.logger.trace("PANG received", [{ ...data }]); - - if ((data.requires || data.provides) && !data.contentType) { - this.logger.warn("Missing topic content-type", data.provides, data.contentType); - - if (data.provides) { - data.contentType = this.serviceDiscovery.getTopics() - .find(t => t.topic === data.provides)?.contentType; - } - - if (data.contentType) { - this.logger.warn("Content-type set to match existing topic", data.contentType); - } else { - data.contentType = "application/x-ndjson"; - this.logger.warn("Content-type set to default", data.contentType); - } - } - - if (data.requires && !csic.inputRouted && data.contentType) { - this.logger.trace("Routing topic to Sequence input", data.requires); - - await this.serviceDiscovery.routeTopicToStream( - { topic: new TopicId(data.requires), contentType: data.contentType as ContentType }, - csic.getInputStream() - ); - - csic.inputRouted = true; - - await this.serviceDiscovery.update({ - requires: data.requires, - contentType: data.contentType!, - topicName: data.requires, - status: "add" - }); - } - - if (data.provides && !csic.outputRouted && data.contentType) { - this.logger.trace("Routing Sequence output to topic", data.provides); - await this.serviceDiscovery.routeStreamToTopic( - csic.getOutputStream(), - { topic: new TopicId(data.provides), contentType: data.contentType as ContentType }, - // csic.id - ); - - csic.outputRouted = true; - - await this.serviceDiscovery.update({ - provides: data.provides, - contentType: data.contentType!, - topicName: data.provides, - status: "add" - }); - } - }); - - csic.on("end", async (code) => { - this.logger.trace("CSIController ended", `Exit code: ${code}`); - - if (csic.provides && csic.provides !== "") { - const topic = this.serviceDiscovery.getTopic(new TopicId(csic.provides)); - - if (topic) csic.getOutputStream()!.unpipe(topic); - } - - csic.logger.unpipe(this.logger); - - delete InstanceStore[csic.id]; - - sequence.instances = sequence.instances.filter(item => { - return item !== id; - }); - - await this.cpmConnector?.sendInstanceInfo({ - id: csic.id, - sequence: sequence - }, InstanceMessageCode.INSTANCE_ENDED); - - this.auditor.auditInstance(id, InstanceMessageCode.INSTANCE_ENDED); - }); - - csic.once("terminated", (code) => { - if (csic.requires && csic.requires !== "") { - const topic = this.serviceDiscovery.getTopic(new TopicId(csic.requires)); - - if (topic) topic.unpipe(csic.getInputStream()); + private attachListeners() { + this.socketServer.on("connect", async (id, streams) => { + this.logger.debug("Instance connecting", id); + + if (!this.instancesStore[id]) { + this.logger.info("Creating new CSIController for unknown Instance"); + + await this.csiDispatcher.createCSIController( + id, + {} as SequenceInfo, + {} as STHRestAPI.StartSequencePayload, + new CommunicationHandler(), + this.config, + this.instanceProxy); } - this.auditor.auditInstance(id, InstanceMessageCode.INSTANCE_ENDED); - this.pushTelemetry("Instance ended", { - executionTime: csic.info.ended && csic.info.started ? ((csic.info.ended?.getTime() - csic.info.started.getTime()) / 1000).toString() : "-1", - id: csic.id, - code: code.toString(), - seqId: csic.sequence.id - }); + await this.instancesStore[id].handleInstanceConnect( + streams + ); }); - - await csic.start(); - - this.logger.trace("CSIController started", id); - - sequence.instances.push(id); - - return csic; } async eventBus(event: EventMessageData) { diff --git a/packages/host/src/lib/serviceDiscovery/sd-adapter.ts b/packages/host/src/lib/serviceDiscovery/sd-adapter.ts index eaeed4761..e5ea3b3de 100644 --- a/packages/host/src/lib/serviceDiscovery/sd-adapter.ts +++ b/packages/host/src/lib/serviceDiscovery/sd-adapter.ts @@ -87,6 +87,8 @@ export class ServiceDiscovery { const topic = this.topicsController.get(topicName); if (topic) { + config.contentType ||= topic.contentType; + if (topic.contentType !== config.contentType) { this.logger.error("Content-type mismatch, existing and requested ", topic.contentType, config.contentType); throw new Error("Content-type mismatch"); @@ -167,6 +169,7 @@ export class ServiceDiscovery { const topic = this.createTopicIfNotExist(topicData); topic.acceptPipe(source); + await this.cpmConnector?.sendTopicInfo({ provides: topicData.topic.toString(), topicName: topicData.topic.toString(), @@ -176,9 +179,8 @@ export class ServiceDiscovery { } async update(data: STHTopicEventData) { - this.logger.trace("Topic update. Send topic info to CPM", data); - if (this.cpmConnector?.connected) { + this.logger.trace("Topic update. Send topic info to CPM", data); await this.cpmConnector?.sendTopicInfo(data); } } diff --git a/packages/host/src/lib/socket-server.ts b/packages/host/src/lib/socket-server.ts index ac48cc069..400d5d1c9 100644 --- a/packages/host/src/lib/socket-server.ts +++ b/packages/host/src/lib/socket-server.ts @@ -45,9 +45,15 @@ export class SocketServer extends TypedEmitter implements IComponent { }); const id = await new Promise((resolve) => { - connection.once("readable", () => { - resolve(connection.read(36).toString()); - }); + const immediateData = connection.read(36); + + if (!immediateData) { + connection.once("readable", () => { + resolve(connection.read(36).toString()); + }); + } else { + resolve(immediateData); + } }); const channel = await new Promise((resolve) => { @@ -71,6 +77,7 @@ export class SocketServer extends TypedEmitter implements IComponent { this.server! .listen(this.port, this.hostname, () => { this.logger.info("SocketServer on", this.server?.address()); + res(); }) .on("error", rej); diff --git a/packages/host/src/lib/utils.ts b/packages/host/src/lib/utils.ts new file mode 100644 index 000000000..87a62d630 --- /dev/null +++ b/packages/host/src/lib/utils.ts @@ -0,0 +1,70 @@ +import { InstanceStatus, RunnerExitCode } from "@scramjet/symbols"; +import { SequenceInfo } from "@scramjet/types"; + +// eslint-disable-next-line complexity +export const mapRunnerExitCode = async (exitcode: number, sequence: SequenceInfo): Promise< +{ message: string, exitcode: number, status: InstanceStatus } +> => { +// eslint-disable-next-line default-case + switch (exitcode) { + case RunnerExitCode.INVALID_ENV_VARS: { + return Promise.reject({ + message: "Runner was started with invalid configuration. This is probably a bug in STH.", + exitcode: RunnerExitCode.INVALID_ENV_VARS, + status: InstanceStatus.ERRORED + }); + } + case RunnerExitCode.PODS_LIMIT_REACHED: { + return Promise.reject({ + message: "Instance limit reached", + exitcode: RunnerExitCode.PODS_LIMIT_REACHED, + status: InstanceStatus.ERRORED + }); + } + case RunnerExitCode.INVALID_SEQUENCE_PATH: { + return Promise.reject({ + message: `Sequence entrypoint path ${sequence.config.entrypointPath} is invalid. ` + + "Check `main` field in Sequence package.json", + exitcode: RunnerExitCode.INVALID_SEQUENCE_PATH, + status: InstanceStatus.ERRORED + }); + } + case RunnerExitCode.SEQUENCE_FAILED_ON_START: { + return Promise.reject({ + message: "Sequence failed on start", + exitcode: RunnerExitCode.SEQUENCE_FAILED_ON_START, + status: InstanceStatus.ERRORED + }); + } + case RunnerExitCode.SEQUENCE_FAILED_DURING_EXECUTION: { + return Promise.reject({ + message: "Sequence failed during execution", + exitcode: RunnerExitCode.SEQUENCE_FAILED_DURING_EXECUTION, + status: InstanceStatus.ERRORED + }); + } + case RunnerExitCode.SEQUENCE_UNPACK_FAILED: { + return Promise.reject({ + message: "Sequence unpack failed", + exitcode: RunnerExitCode.SEQUENCE_UNPACK_FAILED, + status: InstanceStatus.ERRORED + }); + } + case RunnerExitCode.KILLED: { + return Promise.resolve({ + message: "Instance killed", exitcode: RunnerExitCode.KILLED, status: InstanceStatus.COMPLETED + }); + } + case RunnerExitCode.STOPPED: { + return Promise.resolve({ + message: "Instance stopped", exitcode: RunnerExitCode.STOPPED, status: InstanceStatus.COMPLETED + }); + } + } + + if (exitcode > 0) { + return Promise.reject({ message: "Runner failed", exitcode, status: InstanceStatus.ERRORED }); + } + + return Promise.resolve({ message: "Instance completed", exitcode, status: InstanceStatus.COMPLETED }); +}; diff --git a/packages/host/test/serviceDiscovery/sd-discovery.spec.ts b/packages/host/test/serviceDiscovery/sd-discovery.spec.ts index b4a4bc731..deeff275a 100644 --- a/packages/host/test/serviceDiscovery/sd-discovery.spec.ts +++ b/packages/host/test/serviceDiscovery/sd-discovery.spec.ts @@ -21,7 +21,7 @@ beforeEach(() => { serviceDiscovery.cpmConnector = { sendTopicInfo: (data: AddSTHTopicEventData): Promise => { topicInfo = data; - return new Promise((resolve) => resolve()); + return Promise.resolve(); } } as CPMConnector; }); diff --git a/packages/model/src/stream-handler.ts b/packages/model/src/stream-handler.ts index f26dfbdc3..e4a30d2c5 100644 --- a/packages/model/src/stream-handler.ts +++ b/packages/model/src/stream-handler.ts @@ -1,3 +1,4 @@ +import { ObjLogger } from "@scramjet/obj-logger"; import { CommunicationChannel as CC, CPMMessageCode, RunnerMessageCode } from "@scramjet/symbols"; import { ControlMessageCode, @@ -7,6 +8,7 @@ import { EncodedMonitoringMessage, ICommunicationHandler, IObjectLogger, + InstanceConnectionInfo, LoggerOutput, MaybePromise, MessageDataType, @@ -17,7 +19,6 @@ import { UpstreamStreamsConfig, WritableStream } from "@scramjet/types"; -import { ObjLogger } from "@scramjet/obj-logger"; import { DataStream, StringStream } from "scramjet"; import { PassThrough, Readable, Writable } from "stream"; @@ -49,6 +50,7 @@ type MonitoringMessageHandlerList = { type ControlMessageHandlerList = { [RunnerMessageCode.KILL]: ConfiguredMessageHandler[]; [RunnerMessageCode.MONITORING_RATE]: ConfiguredMessageHandler[]; + [RunnerMessageCode.MONITORING_REPLY]: ConfiguredMessageHandler[]; [RunnerMessageCode.STOP]: ConfiguredMessageHandler[]; [RunnerMessageCode.PONG]: ConfiguredMessageHandler[]; [RunnerMessageCode.INPUT_CONTENT_TYPE]: ConfiguredMessageHandler[]; @@ -86,6 +88,7 @@ export class CommunicationHandler implements ICommunicationHandler { this.controlHandlerHash = { [RunnerMessageCode.KILL]: [], [RunnerMessageCode.MONITORING_RATE]: [], + [RunnerMessageCode.MONITORING_REPLY]: [], [RunnerMessageCode.STOP]: [], [RunnerMessageCode.EVENT]: [], [RunnerMessageCode.PONG]: [], @@ -147,11 +150,20 @@ export class CommunicationHandler implements ICommunicationHandler { return this; } + waitForHandshake(): Promise { + return new Promise((res) => { + this.addMonitoringHandler(RunnerMessageCode.PING, (msg) => { + res(msg); + }); + }); + } + pipeMessageStreams() { if (this._piped) { this.logger.error("pipeMessageStreams called twice"); throw new Error("pipeMessageStreams called twice"); } + this._piped = true; if (!this.downstreams || !this.upstreams) { diff --git a/packages/python-runner/hardcoded_magic_values.py b/packages/python-runner/hardcoded_magic_values.py index 12625a6f8..25e42c7a1 100644 --- a/packages/python-runner/hardcoded_magic_values.py +++ b/packages/python-runner/hardcoded_magic_values.py @@ -30,6 +30,6 @@ class RunnerMessageCodes(Enum): STOP = 4001 KILL = 4002 MONITORING_RATE = 4003 - FORCE_CONFIRM_ALIVE = 4004 + MONITORING_REPLY = 4004 EVENT = 5001 diff --git a/packages/python-runner/runner.py b/packages/python-runner/runner.py index 2d0e1e56f..2780b2225 100644 --- a/packages/python-runner/runner.py +++ b/packages/python-runner/runner.py @@ -17,7 +17,8 @@ server_port = os.getenv('INSTANCES_SERVER_PORT') server_host = os.getenv('INSTANCES_SERVER_HOST') or 'localhost' instance_id = os.getenv('INSTANCE_ID') - +runner_connect_info = json.loads(os.getenv("RUNNER_CONNECT_INFO")) +sequence_info = json.loads(os.getenv("SEQUENCE_INFO")) def send_encoded_msg(stream, msg_code, data={}): message = json.dumps([msg_code.value, data]) @@ -116,7 +117,8 @@ async def handshake(self): control = self.streams[CC.CONTROL] self.logger.info(f'Sending PING') - send_encoded_msg(monitoring, msg_codes.PING) + payload = {**runner_connect_info, **{"system":{"processPID":str(os.getpid())}}} + send_encoded_msg(monitoring, msg_codes.PING, {"payload":payload, "sequenceInfo": sequence_info, "id": instance_id}) message = await control.readuntil(b'\n') self.logger.info(f'Got message: {message}') @@ -164,7 +166,7 @@ async def handle_stop(self, data): self.keep_alive_requested = False timeout = data.get('timeout') / 1000 can_keep_alive = data.get('canCallKeepalive') - try: + try: for handler in self.stop_handlers: await handler(timeout, can_keep_alive) except Exception as e: @@ -307,7 +309,7 @@ async def forward_output_stream(self, output): await output.write_to(self.streams[CC.OUT]) - + async def send_keep_alive(self, timeout: int = 0, can_keep_alive: bool = False): monitoring = self.streams[CC.MONITORING] send_encoded_msg(monitoring, msg_codes.ALIVE) @@ -342,7 +344,7 @@ def emit(self, event_name, message=''): msg_codes.EVENT, {'eventName': event_name, 'message': message} ) - + async def keep_alive(self, timeout: int = 0): await self.runner.send_keep_alive(timeout) diff --git a/packages/runner/src/bin/start-runner.ts b/packages/runner/src/bin/start-runner.ts index 520a92ad8..63f1803b2 100755 --- a/packages/runner/src/bin/start-runner.ts +++ b/packages/runner/src/bin/start-runner.ts @@ -2,14 +2,36 @@ import { Runner } from "../runner"; import fs from "fs"; -import { AppConfig } from "@scramjet/types"; +import { AppConfig, SequenceInfo } from "@scramjet/types"; import { HostClient } from "../host-client"; import { RunnerExitCode } from "@scramjet/symbols"; +import { RunnerConnectInfo } from "@scramjet/types/src/runner-connect"; const sequencePath: string = process.env.SEQUENCE_PATH?.replace(/.js$/, "") + ".js"; const instancesServerPort = process.env.INSTANCES_SERVER_PORT; const instancesServerHost = process.env.INSTANCES_SERVER_HOST; const instanceId = process.env.INSTANCE_ID; +const sequenceInfo = process.env.SEQUENCE_INFO; +const runnerConnectInfo = process.env.RUNNER_CONNECT_INFO; + +let connectInfo: SequenceInfo; +let parsedRunnerConnectInfo: RunnerConnectInfo; + +try { + if (!runnerConnectInfo) throw new Error("Connection JSON is required."); + parsedRunnerConnectInfo = JSON.parse(runnerConnectInfo); +} catch { + console.error("Error while parsing connection information."); + process.exit(RunnerExitCode.INVALID_ENV_VARS); +} + +try { + if (!sequenceInfo) throw new Error("Connection JSON is required."); + connectInfo = JSON.parse(sequenceInfo); +} catch { + console.error("Error while parsing connection information."); + process.exit(RunnerExitCode.INVALID_ENV_VARS); +} if (!instancesServerPort || instancesServerPort !== parseInt(instancesServerPort, 10).toString()) { console.error("Incorrect run argument: instancesServerPort"); @@ -43,7 +65,7 @@ const hostClient = new HostClient(+instancesServerPort, instancesServerHost); * @param fifosPath - fifo files path */ -const runner: Runner = new Runner(sequencePath, hostClient, instanceId); +const runner: Runner = new Runner(sequencePath, hostClient, instanceId, connectInfo, parsedRunnerConnectInfo); runner.main() .catch(e => { diff --git a/packages/runner/src/host-client.ts b/packages/runner/src/host-client.ts index 5635b6dd0..93aeb2104 100644 --- a/packages/runner/src/host-client.ts +++ b/packages/runner/src/host-client.ts @@ -1,9 +1,11 @@ /* eslint-disable dot-notation */ -import { IHostClient, IObjectLogger, UpstreamStreamsConfig, } from "@scramjet/types"; -import { CommunicationChannel as CC } from "@scramjet/symbols"; -import net, { createConnection, Socket } from "net"; import { ObjLogger } from "@scramjet/obj-logger"; +import { CommunicationChannel as CC } from "@scramjet/symbols"; +import { IHostClient, IObjectLogger, UpstreamStreamsConfig, } from "@scramjet/types"; +import { defer } from "@scramjet/utility"; import { Agent } from "http"; +import net, { Socket, createConnection } from "net"; +import { PassThrough } from "stream"; type HostOpenConnections = [ net.Socket, net.Socket, net.Socket, net.Socket, net.Socket, net.Socket, net.Socket, net.Socket, net.Socket @@ -42,14 +44,24 @@ class HostClient implements IHostClient { async init(id: string): Promise { const openConnections = await Promise.all( Array.from(Array(9)) - .map(() => { + .map((_e: any, i: number) => { // Error handling for each connection is process crash for now - const connection = net.createConnection(this.instancesServerPort, this.instancesServerHost); - - connection.setNoDelay(true); + let connection: Socket; + + try { + connection = net.createConnection(this.instancesServerPort, this.instancesServerHost); + connection.on("error", () => { + this.logger.warn(`${i} Stream error`); + }); + connection.setNoDelay(true); + } catch (e) { + return Promise.reject(e); + } return new Promise(res => { - connection.on("connect", () => res(connection)); + connection.on("connect", () => { + res(connection); + }); }); }) .map((connPromised, index) => { @@ -62,10 +74,32 @@ class HostClient implements IHostClient { return connection; }); }) - ); + ).catch((_e) => { + //@TODO: handle error. + }); this._streams = openConnections as HostOpenConnections; + const input = this._streams[CC.IN]; + + const inputTarget = new PassThrough({ emitClose: false }); + + input.on("end", async () => { + await defer(500); + + if ((this._streams![CC.CONTROL] as net.Socket).readableEnded) { + this.logger.info("Input end. Control is also ended... We are disconnected."); + } else { + this.logger.info("Input end. Control not ended. We are online. Desired input end."); + inputTarget.end(); + } + }); + + input.pipe(inputTarget, { end: false }); + + this._streams[CC.IN] = inputTarget; + //this._streams[CC.STDIN] = this._streams[CC.STDIN].pipe(new PassThrough({ emitClose: false }), { end: false }); + try { this.bpmux = new BPMux(this._streams[CC.PACKAGE]); } catch (e) { @@ -104,13 +138,18 @@ class HostClient implements IHostClient { this.logger.debug("Connected to host"); } - async disconnect() { + async disconnect(hard: boolean) { this.logger.trace("Disconnecting from host"); const streamsExitedPromised: Promise[] = this.streams.map((stream, i) => new Promise( (res) => { - if ("writable" in stream!) { + if ([CC.IN, CC.STDIN, CC.CONTROL].includes(i)) { + res(); + return; + } + + if (!hard && "writable" in stream!) { stream .on("error", (e) => { console.error("Error on stream", i, e.stack); diff --git a/packages/runner/src/runner.ts b/packages/runner/src/runner.ts index da5f73143..8aefd0b7e 100644 --- a/packages/runner/src/runner.ts +++ b/packages/runner/src/runner.ts @@ -1,3 +1,6 @@ +import { RunnerError } from "@scramjet/model"; +import { ObjLogger } from "@scramjet/obj-logger"; +import { InstanceStatus, RunnerExitCode, RunnerMessageCode } from "@scramjet/symbols"; import { AppConfig, ApplicationFunction, @@ -6,31 +9,53 @@ import { EncodedMonitoringMessage, EventMessageData, HandshakeAcknowledgeMessageData, + HasTopicInformation, IComponent, IHostClient, + IObjectLogger, MaybePromise, MonitoringRateMessageData, + PangMessageData, + RunnerConnectInfo, + SequenceInfo, StopSequenceMessageData, Streamable, - SynchronousStreamable, - HasTopicInformation, - IObjectLogger + SynchronousStreamable } from "@scramjet/types"; -import { RunnerError } from "@scramjet/model"; -import { ObjLogger } from "@scramjet/obj-logger"; -import { RunnerExitCode, RunnerMessageCode } from "@scramjet/symbols"; -import { defer } from "@scramjet/utility"; +import { defer, promiseTimeout } from "@scramjet/utility"; + +import { HostClient as HostApiClient } from "@scramjet/api-client"; +import { ClientUtilsCustomAgent } from "@scramjet/client-utils"; import { BufferStream, DataStream, StringStream } from "scramjet"; import { EventEmitter } from "events"; +import { WriteStream, createWriteStream, writeFileSync } from "fs"; import { Readable, Writable } from "stream"; import { RunnerAppContext, RunnerProxy } from "./runner-app-context"; import { mapToInputDataStream, readInputStreamHeaders, inputStreamInitLogger } from "./input-stream"; import { MessageUtils } from "./message-utils"; -import { HostClient as HostApiClient } from "@scramjet/api-client"; -import { ClientUtilsCustomAgent } from "@scramjet/client-utils"; + +let exitHandled = false; + +function onBeforeExit(code: number) { + if (exitHandled) return; + + const filepath = `/tmp/runner-${process.pid.toString()}`; + + writeFileSync(filepath, code.toString()); + + exitHandled = true; +} + +function onException(_error: Error) { + console.error(_error); + onBeforeExit(RunnerExitCode.UNCAUGHT_EXCEPTION); +} + +process.once("beforeExit", onBeforeExit); +process.once("uncaughtException", onException); // async function flushStream(source: Readable | undefined, target: Writable) { // if (!source) return; @@ -60,9 +85,27 @@ export function isSynchronousStreamable(obj: SynchronousStreamable | Primit const overrideMap: Map = new Map(); +function revertStandardStream(oldStream: Writable) { + if (overrideMap.has(oldStream)) { + const { write, drainCb, errorCb } = overrideMap.get(oldStream) as OverrideConfig; + + // @ts-ignore - this is ok, we're doing this on purpose! + delete oldStream.write; + + // if prototypic write is there, then no change needed + if (oldStream.write !== write) + oldStream.write = write; + + oldStream.off("drain", drainCb); + oldStream.off("error", errorCb); + overrideMap.delete(oldStream); + } +} + function overrideStandardStream(oldStream: Writable, newStream: Writable) { if (overrideMap.has(oldStream)) { - throw new Error("Attempt to override stream more than once"); + //throw new Error("Attempt to override stream more than once"); + revertStandardStream(oldStream); } const write = oldStream.write; @@ -83,23 +126,6 @@ function overrideStandardStream(oldStream: Writable, newStream: Writable) { overrideMap.set(oldStream, { write, drainCb, errorCb }); } -function revertStandardStream(oldStream: Writable) { - if (overrideMap.has(oldStream)) { - const { write, drainCb, errorCb } = overrideMap.get(oldStream) as OverrideConfig; - - // @ts-ignore - this is ok, we're doing this on purpose! - delete oldStream.write; - - // if prototypic write is there, then no change needed - if (oldStream.write !== write) - oldStream.write = write; - - oldStream.off("drain", drainCb); - oldStream.off("error", errorCb); - overrideMap.delete(oldStream); - } -} - /** * Runtime environment for sequence code. * Communicates with Host with data transferred to/from Sequence, health info, @@ -111,6 +137,7 @@ export class Runner implements IComponent { private monitoringInterval?: NodeJS.Timeout; private keepAliveRequested?: boolean; + private monitoringMessageReplyTimeout?: NodeJS.Timeout; private stopExpected: boolean = false; handshakeResolver?: { res: Function; rej: Function }; @@ -118,15 +145,41 @@ export class Runner implements IComponent { private inputDataStream: DataStream; private outputDataStream: DataStream; + private sequenceInfo: SequenceInfo; + + private connected = false; + private created = Date.now(); + + private requires?: string; + private requiresContentType?: string; + private provides?: string; + private providesContentType?: string; + + private inputContentType: string = ""; + private shouldSerialize = false; + private status: InstanceStatus = InstanceStatus.STARTING; + private logFile?: WriteStream; + + private runnerConnectInfo: RunnerConnectInfo = { + appConfig: {} + }; + + instanceOutput?: Readable & HasTopicInformation | void; constructor( private sequencePath: string, private hostClient: IHostClient, - private instanceId: string + private instanceId: string, + sequenceInfo: SequenceInfo, + runnerConnectInfo: RunnerConnectInfo ) { + this.sequenceInfo = sequenceInfo; this.emitter = new EventEmitter(); + this.runnerConnectInfo = runnerConnectInfo; + this.logger = new ObjLogger(this, { id: instanceId }); + hostClient.logger.pipe(this.logger); inputStreamInitLogger.pipe(this.logger); @@ -134,6 +187,12 @@ export class Runner implements IComponent { this.logger.addOutput(process.stdout); } + if (process.env.RUNNER_LOG_FILE) { + this.logFile ||= createWriteStream(process.env.RUNNER_LOG_FILE); + this.logFile.write("\n\n"); + this.logger.addOutput(this.logFile); + } + this.inputDataStream = new DataStream().catch((e: any) => { this.logger.error("Error during input data stream", e); @@ -160,6 +219,10 @@ export class Runner implements IComponent { async controlStreamHandler([code, data]: EncodedControlMessage) { this.logger.debug("Control message received", code, data); + if (this.monitoringMessageReplyTimeout) { + clearTimeout(this.monitoringMessageReplyTimeout); + } + switch (code) { case RunnerMessageCode.MONITORING_RATE: await this.handleMonitoringRequest(data as MonitoringRateMessageData); @@ -178,6 +241,8 @@ export class Runner implements IComponent { this.emitter.emit(eventData.eventName, eventData.message); break; + case RunnerMessageCode.MONITORING_REPLY: + break; default: break; } @@ -194,11 +259,11 @@ export class Runner implements IComponent { } async setInputContentType(headers: any) { - const contentType = headers["content-type"]; + this.inputContentType ||= headers["content-type"]; - this.logger.debug("Content-Type", contentType); + this.logger.debug("Content-Type", this.inputContentType); - mapToInputDataStream(this.hostClient.inputStream, contentType) + mapToInputDataStream(this.hostClient.inputStream, this.inputContentType) .catch((error: any) => { this.logger.error("mapToInputDataStream", error); // TODO: we should be doing some error handling here: @@ -207,6 +272,8 @@ export class Runner implements IComponent { } async handleMonitoringRequest(data: MonitoringRateMessageData): Promise { + this.logger.info("handleMonitoringRequest"); + if (this.monitoringInterval) { clearInterval(this.monitoringInterval); } @@ -214,24 +281,67 @@ export class Runner implements IComponent { let working = false; this.monitoringInterval = setInterval(async () => { + this.logger.info("working", working); + if (working) { - return; + //return; } working = true; - await this.reportHealth(); + await this.reportHealth(1000); working = false; - }, 1000 / data.monitoringRate).unref(); + }, 1000 / data.monitoringRate);//.unref(); } - private async reportHealth() { + private async reportHealth(timeout?: number) { + this.logger.info("Report health"); + const { healthy } = await this.context.monitor(); + if (timeout) { + this.monitoringMessageReplyTimeout = setTimeout(async () => { + this.logger.warn("Monitoring Reply Timeout"); + + await this.handleDisconnect(); + }, timeout); + } + MessageUtils.writeMessageOnStream( [RunnerMessageCode.MONITORING, { healthy }], this.hostClient.monitorStream ); } + async handleDisconnect() { + if (this.monitoringInterval) { + clearInterval(this.monitoringInterval); + } + + if (this.monitoringMessageReplyTimeout) { + clearTimeout(this.monitoringMessageReplyTimeout); + } + + this.connected = false; + + try { + await this.hostClient.disconnect(!this.connected); + await defer(10000); + } catch (e) { + this.logger.error("Disconnect failed"); + } + + this.logger.info("Reinitializing...."); + + await this.premain(); + + if (this.requires) { + this.sendPang({ requires: this.requires, contentType: this.requiresContentType }); + } + + if (this.provides) { + this.sendPang({ provides: this.provides, contentType: this.providesContentType }); + } + } + async handleKillRequest(): Promise { this.logger.debug("Handling KILL request"); @@ -239,10 +349,14 @@ export class Runner implements IComponent { if (!this.stopExpected) { this.logger.trace(`Exiting (unexpected, ${RunnerExitCode.KILLED})`); + this.status = InstanceStatus.KILLING; + return this.exit(RunnerExitCode.KILLED); } this.logger.trace("Exiting (expected)"); + this.status = InstanceStatus.STOPPING; + return this.exit(RunnerExitCode.STOPPED); } @@ -263,6 +377,8 @@ export class Runner implements IComponent { } if (!data.canCallKeepalive || !this.keepAliveRequested) { + this.status = InstanceStatus.STOPPING; + MessageUtils.writeMessageOnStream( [RunnerMessageCode.SEQUENCE_STOPPED, { sequenceError }], this.hostClient.monitorStream ); @@ -276,21 +392,42 @@ export class Runner implements IComponent { } private async exit(exitCode?: number) { - //TODO: we need to wait a bit for the logs to flush - we shouldn't need to as cleanup should wait. await defer(200); this.cleanup() .then((code) => { process.exitCode = exitCode || code; }, (e) => console.error(e?.stack)) - .finally(() => process.exit()); + .finally(() => { onBeforeExit(process.exitCode!); process.exit(); }); } - async main() { - await this.hostClient.init(this.instanceId); + async premain(): Promise<{ appConfig: AppConfig, args: any}> { + this.logger.debug("premain"); + + try { + this.logger.debug("connecting..."); + await promiseTimeout(this.hostClient.init(this.instanceId), 10000); + this.logger.debug("connected"); + this.connected = true; + + await this.handleMonitoringRequest({ monitoringRate: 1 }); + } catch (e) { + this.connected = false; + this.logger.warn("Can't connect to Host", e); + await defer(10000); + + return await this.premain(); + } + + this.logger.debug("Redirecting outputs"); this.redirectOutputs(); + this.logger.debug("Defining control stream"); this.defineControlStream(); + if (this.inputContentType) { + await this.setInputContentType({ headers: { "content-type": this.inputContentType } }); + } + this.hostClient.stdinStream .on("data", (chunk) => process.stdin.unshift(chunk)) .on("end", () => process.stdin.emit("end")); @@ -302,9 +439,18 @@ export class Runner implements IComponent { this.sendHandshakeMessage(); - const { appConfig, args } = await this.waitForHandshakeResponse(); + const { args, appConfig } = this.runnerConnectInfo; - this.logger.debug("Handshake received"); + return { appConfig, args }; + } + + sendPang(args: PangMessageData) { + MessageUtils.writeMessageOnStream( + [RunnerMessageCode.PANG, args], this.hostClient.monitorStream); + } + + async main() { + const { appConfig, args } = await this.premain(); this.initAppContext(appConfig as X); @@ -315,16 +461,14 @@ export class Runner implements IComponent { try { sequence = this.getSequence(); - // this.logger.debug("Sequence", sequence); if (sequence.length && typeof sequence[0] !== "function") { this.logger.debug("First Sequence object is not a function:", sequence[0]); - MessageUtils.writeMessageOnStream( - [RunnerMessageCode.PANG, { - requires: sequence[0].requires, - contentType: sequence[0].contentType - }], this.hostClient.monitorStream); + this.requires = sequence[0].requires; + this.requiresContentType = sequence[0].contentType; + + this.sendPang({ requires: this.requires, contentType: this.requiresContentType }); this.logger.trace("Waiting for input stream"); @@ -354,6 +498,8 @@ export class Runner implements IComponent { this.logger.error("Sequence error:", error.stack); } + this.status = InstanceStatus.ERRORED; + return this.exit(RunnerExitCode.SEQUENCE_FAILED_ON_START); } @@ -361,13 +507,18 @@ export class Runner implements IComponent { await this.runSequence(sequence, args); this.logger.trace(`Sequence completed. Waiting ${this.context.exitTimeout}ms with exit.`); + + this.status = InstanceStatus.COMPLETED; this.writeMonitoringMessage([RunnerMessageCode.SEQUENCE_COMPLETED, { timeout: this.context.exitTimeout }]); await defer(this.context.exitTimeout); + return this.exit(0); } catch (error: any) { this.logger.error("Error occurred during Sequence execution: ", error.stack); + this.status = InstanceStatus.ERRORED; + return this.exit(RunnerExitCode.SEQUENCE_FAILED_DURING_EXECUTION); } } @@ -382,13 +533,18 @@ export class Runner implements IComponent { this.logger.trace("Monitoring interval removed"); } + if (this.monitoringMessageReplyTimeout) { + clearTimeout(this.monitoringMessageReplyTimeout); + this.logger.trace("Monitoring reply check removed"); + } + let exitcode = 0; try { this.logger.info("Cleaning up streams"); - - await this.hostClient.disconnect(); } catch (e: any) { + this.status = InstanceStatus.ERRORED; + exitcode = RunnerExitCode.CLEANUP_FAILED; } @@ -397,17 +553,29 @@ export class Runner implements IComponent { private async revertOutputs() { this.logger.unpipe(this.hostClient.logStream); + revertStandardStream(process.stdout); revertStandardStream(process.stderr); + this.logger.addOutput(process.stderr); } private redirectOutputs() { this.logger.pipe(this.hostClient.logStream, { stringified: true }); + + if (!this.shouldSerialize) { + this.instanceOutput?.pipe(this.hostClient.outputStream); + } + this.outputDataStream .JSONStringify() .pipe(this.hostClient.outputStream); + if (process.env.PRINT_TO_STDOUT) { + process.stdout.pipe(this.logFile!); + process.stderr.pipe(this.logFile!); + } + overrideStandardStream(process.stdout, this.hostClient.stdoutStream); overrideStandardStream(process.stderr, this.hostClient.stderrStream); } @@ -455,7 +623,21 @@ export class Runner implements IComponent { } sendHandshakeMessage() { - MessageUtils.writeMessageOnStream([RunnerMessageCode.PING, {}], this.hostClient.monitorStream); + // TODO: send connection info + MessageUtils.writeMessageOnStream([ + RunnerMessageCode.PING, { + id: this.instanceId, + sequenceInfo: this.sequenceInfo, + created: this.created, + payload: { + ...this.runnerConnectInfo, + system: { + processPID: process.pid.toString() + } + }, + status: this.status, + inputHeadersSent: !!this.inputContentType + }], this.hostClient.monitorStream); this.logger.trace("Handshake sent"); } @@ -494,9 +676,9 @@ export class Runner implements IComponent { * * Pass the input stream to stream instead of creating new DataStream(); */ - let stream: Readable & HasTopicInformation | void = this.inputDataStream; + this.instanceOutput = this.inputDataStream; let itemsLeftInSequence = sequence.length; - let intermediate: SynchronousStreamable | void = stream; + let intermediate: SynchronousStreamable | void = this.instanceOutput; for (const func of sequence) { itemsLeftInSequence--; @@ -506,9 +688,11 @@ export class Runner implements IComponent { try { this.logger.debug("Processing function on index", sequence.length - itemsLeftInSequence - 1); + this.status = InstanceStatus.RUNNING; + out = func.call( this.context, - stream, + this.instanceOutput, ...args ); @@ -516,6 +700,8 @@ export class Runner implements IComponent { } catch (error: any) { this.logger.error("Function errored", sequence.length - itemsLeftInSequence, error.stack); + this.status = InstanceStatus.ERRORED; + throw new RunnerError("SEQUENCE_RUNTIME_ERROR"); } @@ -523,18 +709,21 @@ export class Runner implements IComponent { intermediate = await out; this.logger.info("Function output type", sequence.length - itemsLeftInSequence - 1, typeof out); + if (!intermediate) { this.logger.error("Sequence ended premature"); + this.status = InstanceStatus.ERRORED; + throw new RunnerError("SEQUENCE_ENDED_PREMATURE"); } else if (typeof intermediate === "object" && intermediate instanceof DataStream) { this.logger.debug("Sequence function returned DataStream.", sequence.length - itemsLeftInSequence - 1); - stream = intermediate; + this.instanceOutput = intermediate; } else { this.logger.debug("Sequence function returned readable", sequence.length - itemsLeftInSequence - 1); // TODO: what if this is not a DataStream, but BufferStream stream!!!! - stream = DataStream.from(intermediate as Readable); + this.instanceOutput = DataStream.from(intermediate as Readable); } } else { this.logger.info("All Sequences processed."); @@ -542,17 +731,17 @@ export class Runner implements IComponent { intermediate = await out; if (intermediate instanceof Readable) { - stream = intermediate; + this.instanceOutput = intermediate; } else if (intermediate !== undefined && isSynchronousStreamable(intermediate)) { - stream = Object.assign(DataStream.from(intermediate as Readable, { highWaterMark: 0 }), { + this.instanceOutput = Object.assign(DataStream.from(intermediate as Readable, { highWaterMark: 0 }), { topic: intermediate.topic, contentType: intermediate.contentType }); } else { - stream = undefined; + this.instanceOutput = undefined; } - this.logger.debug("Stream type is", typeof stream); + this.logger.debug("Stream type is", typeof this.instanceOutput); } } @@ -571,41 +760,32 @@ export class Runner implements IComponent { this.hostClient.outputStream.end(`${intermediate}`); - MessageUtils.writeMessageOnStream( - [RunnerMessageCode.PANG, { - provides: "", - contentType: "" - }], - this.hostClient.monitorStream, - ); + this.sendPang({ provides: "", contentType: "" }); res(); - } else if (stream && this.hostClient.outputStream) { - this.logger.trace("Piping Sequence output", typeof stream); + } else if (this.instanceOutput && this.hostClient.outputStream) { + this.logger.trace("Piping Sequence output", typeof this.instanceOutput); - const shouldSerialize = stream.contentType && - ["application/x-ndjson", "text/x-ndjson"].includes(stream.contentType) || - stream instanceof DataStream && !( - stream instanceof StringStream || stream instanceof BufferStream + this.shouldSerialize = this.instanceOutput.contentType && + ["application/x-ndjson", "text/x-ndjson"].includes(this.instanceOutput.contentType) || + this.instanceOutput instanceof DataStream && !( + this.instanceOutput instanceof StringStream || this.instanceOutput instanceof BufferStream ); - stream + this.instanceOutput .once("end", () => { this.logger.debug("Sequence stream ended"); res(); }) - .pipe(shouldSerialize + .pipe(this.shouldSerialize ? this.outputDataStream : this.hostClient.outputStream ); - MessageUtils.writeMessageOnStream( - [RunnerMessageCode.PANG, { - provides: intermediate.topic || "", - contentType: intermediate.contentType || "" - }], - this.hostClient.monitorStream, - ); + this.provides = intermediate.topic || ""; + this.providesContentType = intermediate.contentType || ""; + + this.sendPang({ provides: this.provides, contentType: this.providesContentType }); } else { // TODO: this should push a PANG message with the sequence description this.logger.debug("Sequence did not output a stream"); diff --git a/packages/symbols/src/instance-status-code.ts b/packages/symbols/src/instance-status-code.ts index 74a215472..7e96d2709 100644 --- a/packages/symbols/src/instance-status-code.ts +++ b/packages/symbols/src/instance-status-code.ts @@ -1,5 +1,7 @@ export enum InstanceMessageCode { INSTANCE_STARTED, INSTANCE_STOPPED, - INSTANCE_ENDED + INSTANCE_ENDED, + INSTANCE_TERMINATED, + INSTANCE_CONNECTED } diff --git a/packages/symbols/src/instance-status.ts b/packages/symbols/src/instance-status.ts index 1b74db37e..4fe1f0ae3 100644 --- a/packages/symbols/src/instance-status.ts +++ b/packages/symbols/src/instance-status.ts @@ -1,10 +1,10 @@ - export const enum InstanceStatus { INITIALIZING = "initializing", STARTING = "starting", RUNNING = "running", STOPPING = "stopping", KILLING = "killing", - COMPLETED = "completed", + COMPLETED ="completed", ERRORED = "errored", + GONE = "gone" } diff --git a/packages/symbols/src/runner-exit-code.ts b/packages/symbols/src/runner-exit-code.ts index 3e7bcf11c..1c33d8b43 100644 --- a/packages/symbols/src/runner-exit-code.ts +++ b/packages/symbols/src/runner-exit-code.ts @@ -8,5 +8,6 @@ export enum RunnerExitCode { STOPPED = 138, SUCCESS = 0, CLEANUP_FAILED = 223, - PODS_LIMIT_REACHED = 24 + PODS_LIMIT_REACHED = 24, + UNCAUGHT_EXCEPTION = 101 } diff --git a/packages/symbols/src/runner-message-code.ts b/packages/symbols/src/runner-message-code.ts index c2bce5ede..9c96dff62 100644 --- a/packages/symbols/src/runner-message-code.ts +++ b/packages/symbols/src/runner-message-code.ts @@ -15,5 +15,6 @@ export enum RunnerMessageCode { STOP = 4001, KILL = 4002, MONITORING_RATE = 4003, + MONITORING_REPLY = 4004, EVENT = 5001, } diff --git a/packages/types/src/communication-handler.ts b/packages/types/src/communication-handler.ts index f1e844f5a..eae6f8007 100644 --- a/packages/types/src/communication-handler.ts +++ b/packages/types/src/communication-handler.ts @@ -7,6 +7,7 @@ import { MonitoringMessageCode, UpstreamStreamsConfig } from "./message-streams"; import { MaybePromise } from "./utils"; +import { InstanceConnectionInfo } from "./instance"; export type MonitoringMessageHandler = (msg: EncodedMessage) => void; @@ -18,6 +19,8 @@ export type ControlMessageHandler = export interface ICommunicationHandler { logger: IObjectLogger; + waitForHandshake(): Promise; + hookUpstreamStreams(str: UpstreamStreamsConfig): this; hookDownstreamStreams(str: DownstreamStreamsConfig): this; diff --git a/packages/types/src/csh-connector.ts b/packages/types/src/csh-connector.ts index 7c6f992fb..a239c8726 100644 --- a/packages/types/src/csh-connector.ts +++ b/packages/types/src/csh-connector.ts @@ -1,4 +1,3 @@ -import { MaybePromise } from "./utils"; import { IComponent } from "./component"; import { CommunicationChannel as CC } from "@scramjet/symbols"; import { UpstreamStreamsConfig } from "./message-streams"; @@ -9,12 +8,12 @@ export interface IHostClient extends IComponent { * Interface used by Runner to communicate with Host. */ - init(id: string): MaybePromise; + init(id: string): Promise; /** * Disconnects from a host server. */ - disconnect(): Promise; + disconnect(hard: boolean): Promise; getAgent(): Agent; stdinStream: UpstreamStreamsConfig[CC.STDIN] diff --git a/packages/types/src/index.ts b/packages/types/src/index.ts index d5beb32b6..9f8f717f4 100644 --- a/packages/types/src/index.ts +++ b/packages/types/src/index.ts @@ -25,6 +25,7 @@ export * from "./module-loader"; export * from "./object-logger"; export * from "./op-response"; export * from "./runner-config"; +export * from "./runner-connect"; export * from "./runner"; export * from "./sequence"; export * from "./utils"; diff --git a/packages/types/src/instance.ts b/packages/types/src/instance.ts index 5ece3d3d4..d0b7eb229 100644 --- a/packages/types/src/instance.ts +++ b/packages/types/src/instance.ts @@ -1,3 +1,7 @@ export type InstanceId = string; export type InstanceArgs = any[]; + +export type InstanceConnectionInfo = { + +} diff --git a/packages/types/src/lifecycle-adapters.ts b/packages/types/src/lifecycle-adapters.ts index 3db5800fd..5d5d17ac4 100644 --- a/packages/types/src/lifecycle-adapters.ts +++ b/packages/types/src/lifecycle-adapters.ts @@ -3,6 +3,8 @@ import { MaybePromise } from "./utils"; import { InstanceConfig } from "./runner-config"; import { IObjectLogger } from "./object-logger"; import { InstanceLimits } from "./instance-limits"; +import { SequenceInfo } from "./sequence-adapter"; +import { RunnerConnectInfo } from "./runner-connect"; export type ExitCode = number; @@ -23,24 +25,36 @@ export interface ILifeCycleAdapterMain { // TODO: THIS is forceful removal - let's think about refactor. remove(): MaybePromise; - getCrashLog(): Promise + monitorRate(rps: number): this; + + stats(msg: MonitoringMessageData): Promise; + + getCrashLog(): Promise; + + waitUntilExit(config: InstanceConfig | undefined, instanceId: string, sequenceInfo: SequenceInfo): Promise; } // @TODO create ISequenceAdapter interface export interface ILifeCycleAdapterRun extends ILifeCycleAdapterMain { + setRunner?(system: RunnerConnectInfo["system"]): MaybePromise; + limits: InstanceLimits; /** - * Starts Runner. + * Initiates runner start without waiting for the result * * @param {InstanceConfig} Runner configuration. * @returns {ExitCode} Runner exit code. */ - run(config: InstanceConfig, instancesServerPort: number, instanceId: string): Promise; - - monitorRate(rps: number): this; + dispatch(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo, payload: RunnerConnectInfo): Promise; - stats(msg: MonitoringMessageData): Promise; + /** + * Starts Runner - in essence does `dispatch` and then `waitUntilExit`. + * + * @param {InstanceConfig} Runner configuration. + * @returns {ExitCode} Runner exit code. + */ + run(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo, payload: RunnerConnectInfo): Promise; } export type LifeCycleError = any | (Error & { exitCode?: number, errorMessage?: string }); diff --git a/packages/types/src/message-streams.ts b/packages/types/src/message-streams.ts index edc0ceef5..96a157a68 100644 --- a/packages/types/src/message-streams.ts +++ b/packages/types/src/message-streams.ts @@ -43,6 +43,7 @@ import { LoadCheckStat } from "./load-check-stat"; import { NetworkInfo } from "./network-info"; import { SequenceCompleteMessageData } from "./messages/sequence-complete"; import { KillMessageData } from "./messages/kill-sequence"; +import { MonitoringReplyMessage, MonitoringReplyMessageData } from "./messages/monitor-reply"; export type MessageType = T extends RunnerMessageCode.ACKNOWLEDGE ? AcknowledgeMessage : @@ -53,6 +54,7 @@ export type MessageType = T extends RunnerMessageCode.KILL ? KillSequenceMessage : T extends RunnerMessageCode.MONITORING ? MonitoringMessage : T extends RunnerMessageCode.MONITORING_RATE ? MonitoringRateMessage : + T extends RunnerMessageCode.MONITORING_REPLY ? MonitoringReplyMessage : T extends RunnerMessageCode.STOP ? StopSequenceMessage : T extends RunnerMessageCode.PING ? HandshakeMessage : T extends RunnerMessageCode.PONG ? HandshakeAcknowledgeMessage : @@ -71,6 +73,7 @@ export type MessageDataType = T extends RunnerMessageCode.KILL ? KillMessageData : T extends RunnerMessageCode.MONITORING ? MonitoringMessageData : T extends RunnerMessageCode.MONITORING_RATE ? MonitoringRateMessageData : + T extends RunnerMessageCode.MONITORING_REPLY ? MonitoringReplyMessageData : T extends RunnerMessageCode.STOP ? StopSequenceMessageData : T extends RunnerMessageCode.PING ? PingMessageData : T extends RunnerMessageCode.PONG ? HandshakeAcknowledgeMessageData : @@ -93,7 +96,7 @@ export type EncodedMessage< > = [T, MessageDataType]; export type ControlMessageCode = - RunnerMessageCode.KILL | RunnerMessageCode.MONITORING_RATE | RunnerMessageCode.STOP | RunnerMessageCode.EVENT | + RunnerMessageCode.KILL | RunnerMessageCode.MONITORING_RATE | RunnerMessageCode.MONITORING_REPLY | RunnerMessageCode.STOP | RunnerMessageCode.EVENT | RunnerMessageCode.PONG | CPMMessageCode.STH_ID | CPMMessageCode.KEY_REVOKED | CPMMessageCode.LIMIT_EXCEEDED | CPMMessageCode.ID_DROP | RunnerMessageCode.INPUT_CONTENT_TYPE; diff --git a/packages/types/src/messages/handshake.ts b/packages/types/src/messages/handshake.ts index 07494eb7a..338d9c4af 100644 --- a/packages/types/src/messages/handshake.ts +++ b/packages/types/src/messages/handshake.ts @@ -1,16 +1,31 @@ -import { RunnerMessageCode } from "@scramjet/symbols"; +import { InstanceStatus, RunnerMessageCode } from "@scramjet/symbols"; +import { SequenceInfo } from "../sequence-adapter"; +import { StartSequencePayload } from "../rest-api-sth"; /** * Runner sends a handshake message to the Cloud Server Host (CSH) after it is. * Runner is then waiting to receive the handshake acknowledge message back (PONG) * from the CSH to start the Sequence. */ -export type HandshakeMessage = { msgCode: RunnerMessageCode.PING }; +export type HandshakeMessage = { + msgCode: RunnerMessageCode.PING, + sequence: SequenceInfo, + payload: StartSequencePayload, + sequenceInfo: SequenceInfo +}; -export type PingMessageData = { ports?: Record } +export type PingMessageData = { + id: string; + ports?: Record; + payload: StartSequencePayload; + sequenceInfo: SequenceInfo; + created: number; + status: InstanceStatus; + inputHeadersSent: boolean; +}; export type PangMessageData = { - requires?: string, - contentType?: string, - provides?: string -} + requires?: string; + contentType?: string; + provides?: string; +}; diff --git a/packages/types/src/messages/monitor-reply.ts b/packages/types/src/messages/monitor-reply.ts new file mode 100644 index 000000000..699b35f0a --- /dev/null +++ b/packages/types/src/messages/monitor-reply.ts @@ -0,0 +1,9 @@ +import { RunnerMessageCode } from "@scramjet/symbols"; + +export type MonitoringReplyMessageData = {}; + +/** + * Message instructing Runner how often to emit monitoring messages. + * This message type is sent from CSIController. + */ +export type MonitoringReplyMessage = { msgCode: RunnerMessageCode.MONITORING_REPLY} & MonitoringReplyMessageData; diff --git a/packages/types/src/rest-api-sth/start-sequence.ts b/packages/types/src/rest-api-sth/start-sequence.ts index 1182fe6b0..fc9454200 100644 --- a/packages/types/src/rest-api-sth/start-sequence.ts +++ b/packages/types/src/rest-api-sth/start-sequence.ts @@ -1,13 +1,5 @@ -import { AppConfig } from "../app-config"; -import { InstanceLimits } from "../instance-limits"; +import { RunnerConnectInfo } from "../runner-connect"; export type StartSequenceResponse = { id: string }; -export type StartSequencePayload = { - appConfig: AppConfig; - args?: any[]; - outputTopic?: string; - inputTopic?: string; - limits?: InstanceLimits; - instanceId?: string; -}; +export type StartSequencePayload = Omit, "inputContentType">; diff --git a/packages/types/src/runner-connect.ts b/packages/types/src/runner-connect.ts new file mode 100644 index 000000000..6b101e292 --- /dev/null +++ b/packages/types/src/runner-connect.ts @@ -0,0 +1,12 @@ +import { AppConfig } from "./app-config"; +import { InstanceLimits } from "./instance-limits"; + +export type RunnerConnectInfo = { + appConfig: AppConfig; + args?: any[]; + outputTopic?: string; + inputTopic?: string; + limits?: InstanceLimits; + instanceId?: string; + system?: Record; +} diff --git a/packages/utility/src/index.ts b/packages/utility/src/index.ts index e03d1fd2a..65b4a1936 100644 --- a/packages/utility/src/index.ts +++ b/packages/utility/src/index.ts @@ -1,16 +1,16 @@ +export * from "./config"; +export * from "./constants"; export * from "./defer"; +export * from "./file"; export * from "./free-ports-finder"; +export * from "./keygen"; export * from "./merge"; +export * from "./normalize-url"; export * from "./promise-timeout"; -export * from "./read-streamed-json"; -export * from "./typeguards"; -export * from "./typed-emitter"; +export * from "./process-env"; export * from "./read-json-file"; -export * from "./normalize-url"; +export * from "./read-streamed-json"; export * from "./stream-to-string"; -export * from "./config"; -export * from "./file"; -export * from "./constants"; +export * from "./typed-emitter"; +export * from "./typeguards"; export * from "./validators"; -export * from "./keygen"; -export * from "./process-env"; diff --git a/yarn.lock b/yarn.lock index 74ab7613a..ff7a30737 100644 --- a/yarn.lock +++ b/yarn.lock @@ -6821,10 +6821,10 @@ pad-right@^0.2.2: dependencies: repeat-string "^1.5.2" -papaparse@^5.3.2: - version "5.3.2" - resolved "https://registry.npmjs.org/papaparse/-/papaparse-5.3.2.tgz" - integrity sha512-6dNZu0Ki+gyV0eBsFKJhYr+MdQYAzFUGlBMNj3GNrmHxmz1lfRa24CjFObPXtjcetlOv5Ad299MhIK0znp3afw== +papaparse@^5.4.1: + version "5.4.1" + resolved "https://registry.yarnpkg.com/papaparse/-/papaparse-5.4.1.tgz#f45c0f871853578bd3a30f92d96fdcfb6ebea127" + integrity sha512-HipMsgJkZu8br23pW15uvo6sib6wne/4woLZPlFf3rpDyMe9ywEXUsuD7+6K9PRkJlVT51j/sCOYDKGGS3ZJrw== parent-module@^1.0.0: version "1.0.1" @@ -7484,19 +7484,19 @@ safe-stable-stringify@^2.3.1: resolved "https://registry.npmjs.org/safer-buffer/-/safer-buffer-2.1.2.tgz" integrity sha512-YZo3K82SD7Riyi0E1EQPojLz7kpepnSQI9IyPbHHg1XXXevb5dJI7tpyN2ADxGcQbHG7vcyRHk0cbwqcQriUtg== -scramjet-core@^4.32.10: - version "4.32.10" - resolved "https://registry.npmjs.org/scramjet-core/-/scramjet-core-4.32.10.tgz" - integrity sha512-B4xYXl8+sT3Fy/DAKmGE1TJafHm+U46AUY3w0v4XseNwhy94C+JfzKR/9+F1gmEihn+E7tDPXnqlquGDmQ6o8Q== +scramjet-core@^4.32.12: + version "4.32.12" + resolved "https://registry.yarnpkg.com/scramjet-core/-/scramjet-core-4.32.12.tgz#d049a2b2cb4a2635f2f30e56d54f6b0367cf21d5" + integrity sha512-FkNaZqzXvzqdwrUWzMztJq2RUBcpBlm08zOYIhA69+//FzgrespLBz7DmCXdXfujjvmUIFGgq/T3aPFy1ctonw== -scramjet@^4.36.9: - version "4.36.9" - resolved "https://registry.npmjs.org/scramjet/-/scramjet-4.36.9.tgz" - integrity sha512-vHRQy3hE1gC3tYDKPLGmvQgEQV4oL3JpOA7tP07eGcez+Py2mX8yFu81OeSBPwSwjHX5BrAe8pXVkgT3CMkC/g== +scramjet@^4.36.6, scramjet@^4.36.9: + version "4.37.0" + resolved "https://registry.yarnpkg.com/scramjet/-/scramjet-4.37.0.tgz#2e89f07cbaffd1f9cdd5a3da64aba250745aac13" + integrity sha512-Y6b59qGsulkr5MxiVn9CABnL9pE/sPKihCcWSUhzZc6W0YWbfLWRXc1fE1M40QKfOQUBxks81efzJ7WpEuFmlQ== dependencies: - papaparse "^5.3.2" + papaparse "^5.4.1" rereadable-stream "^1.4.14" - scramjet-core "^4.32.10" + scramjet-core "^4.32.12" seed-random@~2.2.0: version "2.2.0"