From cde44fa4848e068b57019baf44dacb6e260297f4 Mon Sep 17 00:00:00 2001 From: gzukowski Date: Wed, 30 Aug 2023 08:36:00 +0000 Subject: [PATCH 01/11] Changes in adapters in order to store parentID --- .../adapters/src/docker-sequence-adapter.ts | 29 ++++++++++++-- .../adapters/src/dockerode-docker-helper.ts | 22 +++++++++- .../src/kubernetes-sequence-adapter.ts | 5 ++- .../adapters/src/process-sequence-adapter.ts | 7 +++- packages/adapters/src/types.ts | 14 ++++++- packages/host/src/lib/cpm-connector.ts | 2 +- packages/host/src/lib/host.ts | 40 ++++++++++++------- packages/types/src/messages/sequence.ts | 3 +- .../src/rest-api-manager/get-sequence.ts | 1 + .../types/src/rest-api-sth/get-sequence.ts | 1 + packages/types/src/runner-config.ts | 1 + packages/types/src/sequence-adapter.ts | 3 +- 12 files changed, 100 insertions(+), 28 deletions(-) diff --git a/packages/adapters/src/docker-sequence-adapter.ts b/packages/adapters/src/docker-sequence-adapter.ts index a68360c42..cc2b13562 100644 --- a/packages/adapters/src/docker-sequence-adapter.ts +++ b/packages/adapters/src/docker-sequence-adapter.ts @@ -108,6 +108,7 @@ class DockerSequenceAdapter implements ISequenceAdapter { this.logger.debug("Identify started", volume, this.config.docker.prerunner?.maxMem || 0); const ret = await this.parsePackage(streams, wait, volume); + const parentId = await this.dockerHelper.getLabelValue(volume, "parentId"); if (!ret.id) { return undefined; @@ -115,6 +116,10 @@ class DockerSequenceAdapter implements ISequenceAdapter { this.logger.info("Identified image for volume", { volume, image: ret.container?.image }); + if (parentId) { + ret.parent_id = parentId; + } + return ret; } catch (e: any) { this.logger.error("Docker failed", e.message, volume); @@ -132,16 +137,23 @@ class DockerSequenceAdapter implements ISequenceAdapter { * @param {Readable} stream Stream containing sequence to be identified. * @param {string} id Id for the new docker volume where sequence will be stored. * @param {boolean} override Removes previous sequence + * @param {string} parentId Id which indicates sequence's source. + * @returns {Promise} Promise resolving to sequence config. */ - async identify(stream: Readable, id: string, override = false): Promise { + async identify(stream: Readable, id: string, override = false, parentId: string): Promise { const volStart = new Date(); if (override) { await this.dockerHelper.removeVolume(id); } - const volumeId = await this.createVolume(id); + const volumeId = await this.createVolume(id, parentId); + //const parentId = await this.dockerHelper.getLabelValue(volumeId, "parentId"); + console.log("---------------------"); + console.log("IDENTIFY", id); + //console.log("VOLUME", parentId); + console.log("---------------------"); const volSecs = (new Date().getTime() - volStart.getTime()) / 1000; @@ -188,6 +200,9 @@ class DockerSequenceAdapter implements ISequenceAdapter { const config = await this.parsePackage(streams, wait, volumeId); await this.fetch(config.container.image); + if (parentId) { + config.parent_id = parentId; + } return config; } catch (err: any) { @@ -204,11 +219,12 @@ class DockerSequenceAdapter implements ISequenceAdapter { * Creates volume with provided id. * * @param {string} id Volume id. + * @param {string} parentId Sequence's parentId. * @returns {DockerVolume} Created volume. */ - private async createVolume(id: string): Promise { + private async createVolume(id: string, parentId?: string): Promise { try { - return await this.dockerHelper.createVolume(id); + return await this.dockerHelper.createVolume(id, parentId); } catch (error: any) { this.logger.error("Error creating volume", id); @@ -234,6 +250,8 @@ class DockerSequenceAdapter implements ISequenceAdapter { const [preRunnerResult] = (await Promise.all([readStreamedJSON(streams.stdout as Readable), wait])) as any; + console.log("result: ", preRunnerResult); + const parseSecs = (new Date().getTime() - parseStart.getTime()) / 1000; appendFile("timing-log.ndjson", JSON.stringify({ @@ -249,6 +267,7 @@ class DockerSequenceAdapter implements ISequenceAdapter { throw new SequenceAdapterError("PRERUNNER_ERROR", preRunnerResult.error); } + console.log("volumy:", this.dockerHelper.listVolumes()); const validPackageJson = await sequencePackageJSONDecoder.decodeToPromise(preRunnerResult); const engines = validPackageJson.engines ? { ...validPackageJson.engines } : {}; const config = validPackageJson.scramjet?.config ? { ...validPackageJson.scramjet.config } : {}; @@ -259,6 +278,7 @@ class DockerSequenceAdapter implements ISequenceAdapter { ? this.config.docker.runnerImages.python3 : this.config.docker.runnerImages.node; + console.log("runner: ", validPackageJson); return { type: "docker", container, @@ -269,6 +289,7 @@ class DockerSequenceAdapter implements ISequenceAdapter { sequenceDir: PACKAGE_DIR, entrypointPath: validPackageJson.main, id: volumeId, + parent_id: volumeId, description: validPackageJson.description, author: validPackageJson.author, keywords: validPackageJson.keywords, diff --git a/packages/adapters/src/dockerode-docker-helper.ts b/packages/adapters/src/dockerode-docker-helper.ts index d87351d8f..157e7a408 100644 --- a/packages/adapters/src/dockerode-docker-helper.ts +++ b/packages/adapters/src/dockerode-docker-helper.ts @@ -250,13 +250,15 @@ export class DockerodeDockerHelper implements IDockerHelper { * Creates docker volume. * * @param name Volume name. Optional. If not provided, volume will be named with unique name. + * @param parentId Volume parentId. Optional. If not provided, volume will benamed the same as the name. * @returns Volume name. */ - async createVolume(name: string = ""): Promise { + async createVolume(name: string = "", parentId: string = name): Promise { return this.dockerode.createVolume({ Name: name, Labels: { - "org.scramjet.host.is-sequence": "true" + "org.scramjet.host.is-sequence": "true", + parentId : parentId } }).then((volume: Dockerode.Volume) => { return volume.name; @@ -278,9 +280,25 @@ export class DockerodeDockerHelper implements IDockerHelper { filters: { label: { "org.scramjet.host.is-sequence": true } } }); + Volumes.forEach( + volume => { console.log(volume); }); return Volumes.map(volume => volume.Name); } + async getLabelValue(volumeName: string, labelName: string): Promise { + try { + // Get information about the Docker volume + const volumeInfo = await this.dockerode.getVolume(volumeName).inspect(); + + // Access the labels property and retrieve the specific label + const labelValue = volumeInfo.Labels ? volumeInfo.Labels[labelName] : null; + + return labelValue; + } catch (error) { + this.logger.error(`Error reading Docker volume label: ${error}`); + return null; + } + } /** * Attaches to container streams. * diff --git a/packages/adapters/src/kubernetes-sequence-adapter.ts b/packages/adapters/src/kubernetes-sequence-adapter.ts index 5fc3d97d4..b9281fab4 100644 --- a/packages/adapters/src/kubernetes-sequence-adapter.ts +++ b/packages/adapters/src/kubernetes-sequence-adapter.ts @@ -38,6 +38,7 @@ async function getRunnerConfigForStoredSequence(sequencesRoot: string, id: strin version: validPackageJson.version ?? "", name: validPackageJson.name ?? "", id, + parent_id: id, sequenceDir, engines, description: validPackageJson.description, @@ -111,9 +112,11 @@ class KubernetesSequenceAdapter implements ISequenceAdapter { * @param {Readable} stream Stream with packed sequence. * @param {string} id Sequence Id. * @param {boolean} override Removes previous sequence + * @param {string} parentId Sequence's parentId. + * @returns {Promise} Promise resolving to identified sequence configuration. */ - async identify(stream: Readable, id: string, override = false): Promise { + async identify(stream: Readable, id: string, override = false, parentId?: string,): Promise { // 1. Unpack package.json to stdout and map to config // 2. Create compressed package on the disk const sequenceDir = path.join(this.adapterConfig.sequencesRoot, id); diff --git a/packages/adapters/src/process-sequence-adapter.ts b/packages/adapters/src/process-sequence-adapter.ts index 955b19a50..7696df7a2 100644 --- a/packages/adapters/src/process-sequence-adapter.ts +++ b/packages/adapters/src/process-sequence-adapter.ts @@ -31,7 +31,7 @@ async function getRunnerConfigForStoredSequence(sequencesRoot: string, id: strin const validPackageJson = await sequencePackageJSONDecoder.decodeToPromise(packageJson); const engines = validPackageJson.engines ? { ...validPackageJson.engines } : {}; - + console.log("runner: ", validPackageJson); return { type: "process", engines, @@ -39,6 +39,7 @@ async function getRunnerConfigForStoredSequence(sequencesRoot: string, id: strin version: validPackageJson.version ?? "", name: validPackageJson.name ?? "", id, + parent_id: id, sequenceDir, description: validPackageJson.description, author: validPackageJson.author, @@ -100,9 +101,11 @@ class ProcessSequenceAdapter implements ISequenceAdapter { * @param {Readable} stream Stream with packed sequence. * @param {string} id Sequence Id. * @param {boolean} override Removes previous sequence + * @param {string} parentId Sequence's parentId. + * @returns {Promise} Promise resolving to identified sequence configuration. */ - async identify(stream: Readable, id: string, override = false): Promise { + async identify(stream: Readable, id: string,override = false, parentId?: string): Promise { const sequenceDir = path.join(this.config.sequencesRoot, id); if (override) { diff --git a/packages/adapters/src/types.ts b/packages/adapters/src/types.ts index b5ec64fbc..b384547ff 100644 --- a/packages/adapters/src/types.ts +++ b/packages/adapters/src/types.ts @@ -176,6 +176,7 @@ export type DockerAdapterRunResponse = { containerId: DockerContainer }; export interface IDockerHelper { + [x: string]: any; logger: IObjectLogger; /** @@ -249,7 +250,16 @@ export interface IDockerHelper { * @returns {Promise} List of existing volumes */ listVolumes: () => Promise; - + /** + * Get value of a certain label in scramjet's volume with the proviede id + * + * @param {string} volumeName Volume name. + * + * @param {string} labelName label name. + * + * @returns {Promise} The value of a label (null if it doesnt exist) + */ + getLabelValue: (volumeName: string, labelName: string) => Promise /** * Creates volume. * @@ -257,7 +267,7 @@ export interface IDockerHelper { * * @returns {Promise} Created volume. */ - createVolume: (name?: string) => Promise; + createVolume: (name?: string, parentId?: string) => Promise; /** * Removes volume. diff --git a/packages/host/src/lib/cpm-connector.ts b/packages/host/src/lib/cpm-connector.ts index 18364916b..af94ba2c9 100644 --- a/packages/host/src/lib/cpm-connector.ts +++ b/packages/host/src/lib/cpm-connector.ts @@ -558,7 +558,7 @@ export class CPMConnector extends TypedEmitter { this.logger.trace("Send sequence status update", sequenceId, seqStatus); await this.communicationStream?.whenWrote( - [CPMMessageCode.SEQUENCE, { id: sequenceId, status: seqStatus, config }] + [CPMMessageCode.SEQUENCE, { id: sequenceId, status: seqStatus, config: config, }] ); this.logger.trace("Sequence status update sent", sequenceId, seqStatus); diff --git a/packages/host/src/lib/host.ts b/packages/host/src/lib/host.ts index 6c837948b..98526529b 100644 --- a/packages/host/src/lib/host.ts +++ b/packages/host/src/lib/host.ts @@ -747,9 +747,9 @@ export class Host implements IComponent { if (this.config.host.id) { // eslint-disable-next-line max-len - this.sequenceStore.set({ id: config.id, config: config, instances: [], location: this.config.host.id }); + this.sequenceStore.set({ id: config.id, parent_id: config.parent_id, config: config, instances: [], location: this.config.host.id }); } else { - this.sequenceStore.set({ id: config.id, config: config, instances: [], location: "STH" }); + this.sequenceStore.set({ id: config.id, parent_id: config.parent_id, config: config, instances: [], location: "STH" }); } } this.logger.info(` ${configs.length} sequences identified`); @@ -760,10 +760,10 @@ export class Host implements IComponent { async handleIncomingSequence( stream: ParsedMessage, - id: string + id: string, + external?: boolean ): Promise> { stream.params ||= {}; - const sequenceName = stream.params.id_name || stream.headers["x-name"]; this.logger.info("New Sequence incoming", { name: sequenceName }); @@ -794,22 +794,29 @@ export class Host implements IComponent { id = existingSequence.id; } } + const parentId = id; + + if (external) { + id = IDProvider.generate(); + } - const config = await sequenceAdapter.identify(stream, id); + const config = await sequenceAdapter.identify(stream, id, false, parentId); config.packageSize = stream.socket?.bytesRead; if (this.config.host.id) { // eslint-disable-next-line max-len - this.sequenceStore.set({ id, config, instances: [], name: sequenceName, location: this.config.host.id }); + this.sequenceStore.set({ id, parent_id: config.parent_id, config, instances: [], name: sequenceName, location: this.config.host.id }); } else { - this.sequenceStore.set({ id, config, instances: [], name: sequenceName, location: "STH" }); + this.sequenceStore.set({ id, parent_id: config.parent_id, config, instances: [], name: sequenceName, location: "STH" }); } this.logger.trace(`Sequence identified: ${config.id}`); - // eslint-disable-next-line max-len - await this.cpmConnector?.sendSequenceInfo(id, SequenceMessageCode.SEQUENCE_CREATED, config as unknown as GetSequenceResponse); + await this.cpmConnector?.sendSequenceInfo( + id, // parentId + SequenceMessageCode.SEQUENCE_CREATED, + config as unknown as GetSequenceResponse,); this.auditor.auditSequence(id, SequenceMessageCode.SEQUENCE_CREATED); this.pushTelemetry("Sequence uploaded", { language: config.language.toLowerCase(), seqId: id }); @@ -854,14 +861,16 @@ export class Host implements IComponent { * * @param {IncomingMessage} stream Stream of packaged Sequence. * @param {string} id Sequence id. + * @param {boolean} external Define the source of sequence. * @returns {Promise} Promise resolving to operation result. */ - async handleNewSequence(stream: ParsedMessage, id = IDProvider.generate()): + async handleNewSequence(stream: ParsedMessage, id = IDProvider.generate(), external?: boolean): Promise> { const sequenceName = stream.headers["x-name"] as string; if (sequenceName) { const existingSequence = this.sequenceStore.getByNameOrId(sequenceName); + console.log("handler:", existingSequence) if (existingSequence) { this.logger.debug("Method not allowed", sequenceName, existingSequence.id); @@ -872,8 +881,10 @@ export class Host implements IComponent { }; } } - - return this.handleIncomingSequence(stream, id); + console.log("==============================="); + console.log("handlerID:", id); + console.log("==============================="); + return this.handleIncomingSequence(stream, id, external); } async getExternalSequence(id: string): Promise { @@ -895,7 +906,8 @@ export class Host implements IComponent { const result = (await this.handleNewSequence( packageStream as ParsedMessage, - id + id, + true )) as STHRestAPI.SendSequenceResponse; return this.sequenceStore.getById(result.id)!; @@ -1154,6 +1166,7 @@ export class Host implements IComponent { return { opStatus: ReasonPhrases.OK, id: sequence.id, + parent_id: sequence.parent_id, name: sequence.name, config: sequence.config, location: sequence.location, @@ -1168,7 +1181,6 @@ export class Host implements IComponent { */ getSequences(): STHRestAPI.GetSequencesResponse { this.logger.info("List Sequences"); - return this.sequenceStore.sequences; } diff --git a/packages/types/src/messages/sequence.ts b/packages/types/src/messages/sequence.ts index 86da1d40b..60d9af6d5 100644 --- a/packages/types/src/messages/sequence.ts +++ b/packages/types/src/messages/sequence.ts @@ -4,7 +4,8 @@ import { SequenceConfig } from "../runner-config"; export type SequenceMessageData = { id: string, status: SequenceMessageCode, - config: SequenceConfig + config: SequenceConfig, + parent_id?: string } export type SequenceMessage = { msgCode: CPMMessageCode.SEQUENCE } & SequenceMessageData; diff --git a/packages/types/src/rest-api-manager/get-sequence.ts b/packages/types/src/rest-api-manager/get-sequence.ts index ed7231c0d..43cde6293 100644 --- a/packages/types/src/rest-api-manager/get-sequence.ts +++ b/packages/types/src/rest-api-manager/get-sequence.ts @@ -4,6 +4,7 @@ export type GetSequenceResponse = { instances: string[]; name?: string; id: string; + parent_id: string; config: SequenceConfig; location : string; } diff --git a/packages/types/src/rest-api-sth/get-sequence.ts b/packages/types/src/rest-api-sth/get-sequence.ts index 781ba9a1d..e10df9447 100644 --- a/packages/types/src/rest-api-sth/get-sequence.ts +++ b/packages/types/src/rest-api-sth/get-sequence.ts @@ -2,6 +2,7 @@ import { SequenceConfig } from "../runner-config"; export type GetSequenceResponse = { id: string; + parent_id: string; name?: string; config: SequenceConfig, location: string, diff --git a/packages/types/src/runner-config.ts b/packages/types/src/runner-config.ts index 251341012..27a0f2003 100644 --- a/packages/types/src/runner-config.ts +++ b/packages/types/src/runner-config.ts @@ -9,6 +9,7 @@ type CommonSequenceConfig = { type: string; engines: Record; id: string; + parent_id: string; /** * Relative path from sequence package root to JS file */ diff --git a/packages/types/src/sequence-adapter.ts b/packages/types/src/sequence-adapter.ts index 86a2d966e..345407341 100644 --- a/packages/types/src/sequence-adapter.ts +++ b/packages/types/src/sequence-adapter.ts @@ -8,6 +8,7 @@ export type SequenceInfo = { instances: string[]; location : string; name? : string; + parent_id: string; } export type SequenceInfoInstance = Omit; @@ -28,7 +29,7 @@ export interface ISequenceAdapter { /** * Identifies new Sequence */ - identify(stream: Readable, id: string, override?: boolean): Promise; + identify(stream: Readable, id: string, override?: boolean, parent_id?: string,): Promise; remove(conifg: SequenceConfig): Promise From 78881c09878a0f6af98d379fe78847a698f96056 Mon Sep 17 00:00:00 2001 From: gzukowski Date: Wed, 30 Aug 2023 10:03:34 +0000 Subject: [PATCH 02/11] Fixed csi-controller --- packages/host/src/lib/csi-controller.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/host/src/lib/csi-controller.ts b/packages/host/src/lib/csi-controller.ts index da3e4114f..e2ac0a724 100644 --- a/packages/host/src/lib/csi-controller.ts +++ b/packages/host/src/lib/csi-controller.ts @@ -789,7 +789,8 @@ export class CSIController extends TypedEmitter { id: this.sequence.id, config: this.sequence.config, name: this.sequence.name, - location : this.sequence.location + location : this.sequence.location, + parent_id: this.sequence.parent_id }, ports: this.info.ports, created: this.info.created, From cfe2b96d80ccc07afbab510fd09c1f387fc510fd Mon Sep 17 00:00:00 2001 From: gzukowski Date: Wed, 30 Aug 2023 13:40:06 +0000 Subject: [PATCH 03/11] Support for process adapter --- .../adapters/src/process-sequence-adapter.ts | 28 +++++++++++++------ packages/host/src/lib/host.ts | 12 +++++++- 2 files changed, 30 insertions(+), 10 deletions(-) diff --git a/packages/adapters/src/process-sequence-adapter.ts b/packages/adapters/src/process-sequence-adapter.ts index 7696df7a2..7ebaae4a1 100644 --- a/packages/adapters/src/process-sequence-adapter.ts +++ b/packages/adapters/src/process-sequence-adapter.ts @@ -23,15 +23,21 @@ import { detectLanguage } from "./utils"; * @param {string} id Sequence Id. * @returns {ProcessSequenceConfig} Sequence configuration. */ -// eslint-disable-next-line complexity -async function getRunnerConfigForStoredSequence(sequencesRoot: string, id: string): Promise { - const sequenceDir = path.join(sequencesRoot, id); +// eslint-disable-next-line complexity, max-len +async function getRunnerConfigForStoredSequence(sequencesRoot: string, id: string, parentId?: string): Promise { + let sequenceDir: string; + + if (parentId) { + sequenceDir = path.join(sequencesRoot, id + "_" + parentId); + } else { + [id, parentId] = id.split("_"); + sequenceDir = path.join(sequencesRoot, id + "_" + parentId); + } const packageJsonPath = path.join(sequenceDir, "package.json"); const packageJson = await readStreamedJSON(createReadStream(packageJsonPath)); - const validPackageJson = await sequencePackageJSONDecoder.decodeToPromise(packageJson); const engines = validPackageJson.engines ? { ...validPackageJson.engines } : {}; - console.log("runner: ", validPackageJson); + return { type: "process", engines, @@ -39,7 +45,7 @@ async function getRunnerConfigForStoredSequence(sequencesRoot: string, id: strin version: validPackageJson.version ?? "", name: validPackageJson.name ?? "", id, - parent_id: id, + parent_id: parentId || id, sequenceDir, description: validPackageJson.description, author: validPackageJson.author, @@ -83,6 +89,8 @@ class ProcessSequenceAdapter implements ISequenceAdapter { */ async list(): Promise { const storedSequencesIds = await fs.readdir(this.config.sequencesRoot); + + console.log(storedSequencesIds); const sequencesConfigs = (await Promise.all( storedSequencesIds .map((id) => getRunnerConfigForStoredSequence(this.config.sequencesRoot, id)) @@ -105,8 +113,10 @@ class ProcessSequenceAdapter implements ISequenceAdapter { * @returns {Promise} Promise resolving to identified sequence configuration. */ - async identify(stream: Readable, id: string,override = false, parentId?: string): Promise { - const sequenceDir = path.join(this.config.sequencesRoot, id); + async identify(stream: Readable, id: string, override = false, parentId = id): Promise { + const sequenceDir = path.join(this.config.sequencesRoot, id + "_" + parentId); + + console.log("DIRR: ", sequenceDir); if (override) { await fs.rm(sequenceDir, { recursive: true, force: true }); @@ -143,7 +153,7 @@ class ProcessSequenceAdapter implements ISequenceAdapter { this.logger.debug("Unpacking sequence succeeded", stderrOutput); - return getRunnerConfigForStoredSequence(this.config.sequencesRoot, id); + return getRunnerConfigForStoredSequence(this.config.sequencesRoot, id, parentId); } /** diff --git a/packages/host/src/lib/host.ts b/packages/host/src/lib/host.ts index 98526529b..67484f69d 100644 --- a/packages/host/src/lib/host.ts +++ b/packages/host/src/lib/host.ts @@ -742,6 +742,8 @@ export class Host implements IComponent { const configs = await sequenceAdapter.list(); + sequenceAdapter.logger.pipe(this.logger); + for (const config of configs) { this.logger.trace(`Sequence identified: ${config.id}`); @@ -799,9 +801,11 @@ export class Host implements IComponent { if (external) { id = IDProvider.generate(); } + console.log("\n", "przed configuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu"); const config = await sequenceAdapter.identify(stream, id, false, parentId); + console.log("\n", "po configuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu"); config.packageSize = stream.socket?.bytesRead; if (this.config.host.id) { @@ -870,7 +874,8 @@ export class Host implements IComponent { if (sequenceName) { const existingSequence = this.sequenceStore.getByNameOrId(sequenceName); - console.log("handler:", existingSequence) + + console.log("handler:", existingSequence); if (existingSequence) { this.logger.debug("Method not allowed", sequenceName, existingSequence.id); @@ -945,6 +950,7 @@ export class Host implements IComponent { return { opStatus: ReasonPhrases.UNPROCESSABLE_ENTITY, error: "Invalid Instance id" }; } +<<<<<<< HEAD if (this.instancesStore[payload.instanceId]) { return { opStatus: ReasonPhrases.CONFLICT, @@ -952,6 +958,10 @@ export class Host implements IComponent { }; } } +======= + console.log(sequence); + } +>>>>>>> 6f070b71... Support for process adapter let sequence = this.sequenceStore.getByNameOrId(sequenceId); From fbf8a9623b6f43585526d5c41fa06180aa304cae Mon Sep 17 00:00:00 2001 From: gzukowski Date: Wed, 30 Aug 2023 13:51:19 +0000 Subject: [PATCH 04/11] Kubernetesconfig support added --- .../src/kubernetes-sequence-adapter.ts | 20 +++++++++++++------ .../adapters/src/process-sequence-adapter.ts | 4 ---- packages/host/src/lib/host.ts | 16 +++++++-------- 3 files changed, 22 insertions(+), 18 deletions(-) diff --git a/packages/adapters/src/kubernetes-sequence-adapter.ts b/packages/adapters/src/kubernetes-sequence-adapter.ts index b9281fab4..6691b4858 100644 --- a/packages/adapters/src/kubernetes-sequence-adapter.ts +++ b/packages/adapters/src/kubernetes-sequence-adapter.ts @@ -24,8 +24,16 @@ import { detectLanguage } from "./utils"; * @param {string} id Sequence Id. * @returns {ProcessSequenceConfig} Sequence configuration. */ -async function getRunnerConfigForStoredSequence(sequencesRoot: string, id: string): Promise { - const sequenceDir = path.join(sequencesRoot, id); +// eslint-disable-next-line max-len +async function getRunnerConfigForStoredSequence(sequencesRoot: string, id: string, parentId?: string): Promise { + let sequenceDir: string; + + if (parentId) { + sequenceDir = path.join(sequencesRoot, id + "_" + parentId); + } else { + [id, parentId] = id.split("_"); + sequenceDir = path.join(sequencesRoot, id + "_" + parentId); + } const packageJsonPath = path.join(sequenceDir, "package.json"); const packageJson = await readStreamedJSON(createReadStream(packageJsonPath)); @@ -38,7 +46,7 @@ async function getRunnerConfigForStoredSequence(sequencesRoot: string, id: strin version: validPackageJson.version ?? "", name: validPackageJson.name ?? "", id, - parent_id: id, + parent_id: parentId || id, sequenceDir, engines, description: validPackageJson.description, @@ -116,10 +124,10 @@ class KubernetesSequenceAdapter implements ISequenceAdapter { * @returns {Promise} Promise resolving to identified sequence configuration. */ - async identify(stream: Readable, id: string, override = false, parentId?: string,): Promise { + async identify(stream: Readable, id: string, override = false, parentId = id,): Promise { // 1. Unpack package.json to stdout and map to config // 2. Create compressed package on the disk - const sequenceDir = path.join(this.adapterConfig.sequencesRoot, id); + const sequenceDir = path.join(this.adapterConfig.sequencesRoot, id + "_" + parentId); if (override) { await fs.rm(sequenceDir, { recursive: true, force: true }); @@ -137,7 +145,7 @@ class KubernetesSequenceAdapter implements ISequenceAdapter { await new Promise(res => uncompressingProc.on("close", res)); - return getRunnerConfigForStoredSequence(this.adapterConfig.sequencesRoot, id); + return getRunnerConfigForStoredSequence(this.adapterConfig.sequencesRoot, id, parentId); } /** diff --git a/packages/adapters/src/process-sequence-adapter.ts b/packages/adapters/src/process-sequence-adapter.ts index 7ebaae4a1..2924f3240 100644 --- a/packages/adapters/src/process-sequence-adapter.ts +++ b/packages/adapters/src/process-sequence-adapter.ts @@ -89,8 +89,6 @@ class ProcessSequenceAdapter implements ISequenceAdapter { */ async list(): Promise { const storedSequencesIds = await fs.readdir(this.config.sequencesRoot); - - console.log(storedSequencesIds); const sequencesConfigs = (await Promise.all( storedSequencesIds .map((id) => getRunnerConfigForStoredSequence(this.config.sequencesRoot, id)) @@ -116,8 +114,6 @@ class ProcessSequenceAdapter implements ISequenceAdapter { async identify(stream: Readable, id: string, override = false, parentId = id): Promise { const sequenceDir = path.join(this.config.sequencesRoot, id + "_" + parentId); - console.log("DIRR: ", sequenceDir); - if (override) { await fs.rm(sequenceDir, { recursive: true, force: true }); } diff --git a/packages/host/src/lib/host.ts b/packages/host/src/lib/host.ts index 67484f69d..149907bd4 100644 --- a/packages/host/src/lib/host.ts +++ b/packages/host/src/lib/host.ts @@ -801,11 +801,8 @@ export class Host implements IComponent { if (external) { id = IDProvider.generate(); } - console.log("\n", "przed configuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu"); - const config = await sequenceAdapter.identify(stream, id, false, parentId); - console.log("\n", "po configuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu"); config.packageSize = stream.socket?.bytesRead; if (this.config.host.id) { @@ -875,8 +872,6 @@ export class Host implements IComponent { if (sequenceName) { const existingSequence = this.sequenceStore.getByNameOrId(sequenceName); - console.log("handler:", existingSequence); - if (existingSequence) { this.logger.debug("Method not allowed", sequenceName, existingSequence.id); @@ -886,9 +881,6 @@ export class Host implements IComponent { }; } } - console.log("==============================="); - console.log("handlerID:", id); - console.log("==============================="); return this.handleIncomingSequence(stream, id, external); } @@ -945,6 +937,7 @@ export class Host implements IComponent { const sequenceId = req.params?.id as string; const payload = req.body || ({} as STHRestAPI.StartSequencePayload); +<<<<<<< HEAD if (payload.instanceId) { if (!isStartSequenceEndpointPayloadDTO(payload)) { return { opStatus: ReasonPhrases.UNPROCESSABLE_ENTITY, error: "Invalid Instance id" }; @@ -960,6 +953,13 @@ export class Host implements IComponent { } ======= console.log(sequence); +======= + if (this.cpmConnector?.connected) { + sequence ||= await this.getExternalSequence(sequenceId).catch((error: ReasonPhrases) => { + this.logger.error("Error getting sequence from external sources", error); + return undefined; + }); +>>>>>>> bea3057d... Kubernetesconfig support added } >>>>>>> 6f070b71... Support for process adapter From 060ac4381fece924cc9aa1bbc9d00cb76414d2fe Mon Sep 17 00:00:00 2001 From: gzukowski Date: Wed, 30 Aug 2023 14:10:42 +0000 Subject: [PATCH 05/11] Debug leftover deletion, comments --- packages/adapters/src/docker-sequence-adapter.ts | 11 ----------- packages/adapters/src/dockerode-docker-helper.ts | 10 +++++++--- 2 files changed, 7 insertions(+), 14 deletions(-) diff --git a/packages/adapters/src/docker-sequence-adapter.ts b/packages/adapters/src/docker-sequence-adapter.ts index cc2b13562..242e5e439 100644 --- a/packages/adapters/src/docker-sequence-adapter.ts +++ b/packages/adapters/src/docker-sequence-adapter.ts @@ -149,12 +149,6 @@ class DockerSequenceAdapter implements ISequenceAdapter { } const volumeId = await this.createVolume(id, parentId); - //const parentId = await this.dockerHelper.getLabelValue(volumeId, "parentId"); - console.log("---------------------"); - console.log("IDENTIFY", id); - //console.log("VOLUME", parentId); - console.log("---------------------"); - const volSecs = (new Date().getTime() - volStart.getTime()) / 1000; appendFile("timing-log.ndjson", JSON.stringify({ @@ -249,9 +243,6 @@ class DockerSequenceAdapter implements ISequenceAdapter { const parseStart = new Date(); const [preRunnerResult] = (await Promise.all([readStreamedJSON(streams.stdout as Readable), wait])) as any; - - console.log("result: ", preRunnerResult); - const parseSecs = (new Date().getTime() - parseStart.getTime()) / 1000; appendFile("timing-log.ndjson", JSON.stringify({ @@ -267,7 +258,6 @@ class DockerSequenceAdapter implements ISequenceAdapter { throw new SequenceAdapterError("PRERUNNER_ERROR", preRunnerResult.error); } - console.log("volumy:", this.dockerHelper.listVolumes()); const validPackageJson = await sequencePackageJSONDecoder.decodeToPromise(preRunnerResult); const engines = validPackageJson.engines ? { ...validPackageJson.engines } : {}; const config = validPackageJson.scramjet?.config ? { ...validPackageJson.scramjet.config } : {}; @@ -278,7 +268,6 @@ class DockerSequenceAdapter implements ISequenceAdapter { ? this.config.docker.runnerImages.python3 : this.config.docker.runnerImages.node; - console.log("runner: ", validPackageJson); return { type: "docker", container, diff --git a/packages/adapters/src/dockerode-docker-helper.ts b/packages/adapters/src/dockerode-docker-helper.ts index 157e7a408..ff6f9d6b0 100644 --- a/packages/adapters/src/dockerode-docker-helper.ts +++ b/packages/adapters/src/dockerode-docker-helper.ts @@ -280,11 +280,15 @@ export class DockerodeDockerHelper implements IDockerHelper { filters: { label: { "org.scramjet.host.is-sequence": true } } }); - Volumes.forEach( - volume => { console.log(volume); }); return Volumes.map(volume => volume.Name); } - + /** + * Access to the value of volume's label + * @param volumeName Volume name. + * @param labelName Label name. + * + * @returns Promise which resolves when volume has been removed. + */ async getLabelValue(volumeName: string, labelName: string): Promise { try { // Get information about the Docker volume From e2346920ec41b738ebec7787e8fa3de4d70f4ab8 Mon Sep 17 00:00:00 2001 From: gzukowski Date: Thu, 31 Aug 2023 08:42:54 +0000 Subject: [PATCH 06/11] Fixed sequence deletion --- packages/adapters/src/kubernetes-sequence-adapter.ts | 2 +- packages/adapters/src/process-sequence-adapter.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/adapters/src/kubernetes-sequence-adapter.ts b/packages/adapters/src/kubernetes-sequence-adapter.ts index 6691b4858..78478c053 100644 --- a/packages/adapters/src/kubernetes-sequence-adapter.ts +++ b/packages/adapters/src/kubernetes-sequence-adapter.ts @@ -159,7 +159,7 @@ class KubernetesSequenceAdapter implements ISequenceAdapter { throw new Error(`Incorrect SequenceConfig passed to KubernetesSequenceAdapter: ${config.type}`); } - const sequenceDir = path.join(this.adapterConfig.sequencesRoot, config.id); + const sequenceDir = config.sequenceDir; this.logger.debug("Removing sequence directory...", sequenceDir); diff --git a/packages/adapters/src/process-sequence-adapter.ts b/packages/adapters/src/process-sequence-adapter.ts index 2924f3240..df4fd4b22 100644 --- a/packages/adapters/src/process-sequence-adapter.ts +++ b/packages/adapters/src/process-sequence-adapter.ts @@ -163,7 +163,7 @@ class ProcessSequenceAdapter implements ISequenceAdapter { throw new Error(`Incorrect SequenceConfig passed to ProcessSequenceAdapter: ${config.type}`); } - const sequenceDir = path.join(this.config.sequencesRoot, config.id); + const sequenceDir = config.sequenceDir; return fs.rm(sequenceDir, { recursive: true }); } From 77d3efefdccfa4c4ef103b8ba97b93c52a79d351 Mon Sep 17 00:00:00 2001 From: gzukowski Date: Mon, 4 Sep 2023 08:51:10 +0000 Subject: [PATCH 07/11] Changed the approach of handling docker-adapter --- packages/adapters/src/docker-sequence-adapter.ts | 11 +++++++---- packages/adapters/src/dockerode-docker-helper.ts | 4 ++-- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/packages/adapters/src/docker-sequence-adapter.ts b/packages/adapters/src/docker-sequence-adapter.ts index 242e5e439..28222611e 100644 --- a/packages/adapters/src/docker-sequence-adapter.ts +++ b/packages/adapters/src/docker-sequence-adapter.ts @@ -108,7 +108,7 @@ class DockerSequenceAdapter implements ISequenceAdapter { this.logger.debug("Identify started", volume, this.config.docker.prerunner?.maxMem || 0); const ret = await this.parsePackage(streams, wait, volume); - const parentId = await this.dockerHelper.getLabelValue(volume, "parentId"); + const [, parentId] = volume.split("_"); if (!ret.id) { return undefined; @@ -261,6 +261,7 @@ class DockerSequenceAdapter implements ISequenceAdapter { const validPackageJson = await sequencePackageJSONDecoder.decodeToPromise(preRunnerResult); const engines = validPackageJson.engines ? { ...validPackageJson.engines } : {}; const config = validPackageJson.scramjet?.config ? { ...validPackageJson.scramjet.config } : {}; + const [id, parentId] = volumeId.split("_"); const container = Object.assign({}, this.config.docker.runner); @@ -277,8 +278,8 @@ class DockerSequenceAdapter implements ISequenceAdapter { config, sequenceDir: PACKAGE_DIR, entrypointPath: validPackageJson.main, - id: volumeId, - parent_id: volumeId, + id: id, + parent_id: parentId, description: validPackageJson.description, author: validPackageJson.author, keywords: validPackageJson.keywords, @@ -298,7 +299,9 @@ class DockerSequenceAdapter implements ISequenceAdapter { throw new Error(`Incorrect SequenceConfig passed to DockerSequenceAdapter: ${config.type}`); } - await this.dockerHelper.removeVolume(config.id); + const volumeId = config.id + "_" + config.parent_id; + + await this.dockerHelper.removeVolume(volumeId); this.logger.debug("Volume removed", config.id); } diff --git a/packages/adapters/src/dockerode-docker-helper.ts b/packages/adapters/src/dockerode-docker-helper.ts index ff6f9d6b0..64dd7d9dc 100644 --- a/packages/adapters/src/dockerode-docker-helper.ts +++ b/packages/adapters/src/dockerode-docker-helper.ts @@ -250,15 +250,15 @@ export class DockerodeDockerHelper implements IDockerHelper { * Creates docker volume. * * @param name Volume name. Optional. If not provided, volume will be named with unique name. - * @param parentId Volume parentId. Optional. If not provided, volume will benamed the same as the name. + * @param parentId Volume parentId. Optional. If not provided, volume will be named the same as the name. * @returns Volume name. */ async createVolume(name: string = "", parentId: string = name): Promise { + name = name + "_" + parentId; return this.dockerode.createVolume({ Name: name, Labels: { "org.scramjet.host.is-sequence": "true", - parentId : parentId } }).then((volume: Dockerode.Volume) => { return volume.name; From 2b7bb9db21df44fd3337046b413052d530ed2952 Mon Sep 17 00:00:00 2001 From: gzukowski Date: Mon, 4 Sep 2023 09:36:04 +0000 Subject: [PATCH 08/11] Fix for the wrong detection of volume --- packages/adapters/src/docker-instance-adapter.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/adapters/src/docker-instance-adapter.ts b/packages/adapters/src/docker-instance-adapter.ts index 760aaa719..3f7be6857 100644 --- a/packages/adapters/src/docker-instance-adapter.ts +++ b/packages/adapters/src/docker-instance-adapter.ts @@ -186,6 +186,8 @@ IComponent { const networkSetup = await this.getNetworkSetup(); + const volumeId = config.id + "_" + config.parent_id; + const envs = getRunnerEnvEntries({ sequencePath: path.join(config.sequenceDir, config.entrypointPath), instancesServerPort, @@ -200,7 +202,7 @@ IComponent { imageName: config.container.image, volumes: [ ...extraVolumes, - { mountPoint: config.sequenceDir, volume: config.id, writeable: false } + { mountPoint: config.sequenceDir, volume: volumeId, writeable: false } ], labels: { "scramjet.sequence.name": config.name From a756f40f6ff0df38ca0ba1a051080527b491c65a Mon Sep 17 00:00:00 2001 From: gzukowski Date: Tue, 5 Sep 2023 13:59:55 +0000 Subject: [PATCH 09/11] Fixed the personally named sequences --- packages/adapters/src/kubernetes-sequence-adapter.ts | 9 ++++++++- packages/adapters/src/process-sequence-adapter.ts | 11 +++++++++-- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/packages/adapters/src/kubernetes-sequence-adapter.ts b/packages/adapters/src/kubernetes-sequence-adapter.ts index 78478c053..a4d261191 100644 --- a/packages/adapters/src/kubernetes-sequence-adapter.ts +++ b/packages/adapters/src/kubernetes-sequence-adapter.ts @@ -16,6 +16,7 @@ import { isDefined, readStreamedJSON } from "@scramjet/utility"; import { sequencePackageJSONDecoder } from "./validate-sequence-package-json"; import { adapterConfigDecoder } from "./kubernetes-config-decoder"; import { detectLanguage } from "./utils"; +import { IDProvider } from "@scramjet/model"; /** * Returns existing Sequence configuration. @@ -32,7 +33,13 @@ async function getRunnerConfigForStoredSequence(sequencesRoot: string, id: strin sequenceDir = path.join(sequencesRoot, id + "_" + parentId); } else { [id, parentId] = id.split("_"); - sequenceDir = path.join(sequencesRoot, id + "_" + parentId); + const valid = IDProvider.isValid(id); + + if (valid) { + sequenceDir = path.join(sequencesRoot, id + "_" + parentId); + } else { + sequenceDir = path.join(sequencesRoot, id); + } } const packageJsonPath = path.join(sequenceDir, "package.json"); const packageJson = await readStreamedJSON(createReadStream(packageJsonPath)); diff --git a/packages/adapters/src/process-sequence-adapter.ts b/packages/adapters/src/process-sequence-adapter.ts index df4fd4b22..b46fba600 100644 --- a/packages/adapters/src/process-sequence-adapter.ts +++ b/packages/adapters/src/process-sequence-adapter.ts @@ -13,7 +13,7 @@ import path from "path"; import { exec } from "child_process"; import { isDefined, readStreamedJSON } from "@scramjet/utility"; import { sequencePackageJSONDecoder } from "./validate-sequence-package-json"; -import { SequenceAdapterError } from "@scramjet/model"; +import { IDProvider, SequenceAdapterError } from "@scramjet/model"; import { detectLanguage } from "./utils"; /** @@ -31,7 +31,13 @@ async function getRunnerConfigForStoredSequence(sequencesRoot: string, id: strin sequenceDir = path.join(sequencesRoot, id + "_" + parentId); } else { [id, parentId] = id.split("_"); - sequenceDir = path.join(sequencesRoot, id + "_" + parentId); + const valid = IDProvider.isValid(id); + + if (valid) { + sequenceDir = path.join(sequencesRoot, id + "_" + parentId); + } else { + sequenceDir = path.join(sequencesRoot, id); + } } const packageJsonPath = path.join(sequenceDir, "package.json"); const packageJson = await readStreamedJSON(createReadStream(packageJsonPath)); @@ -89,6 +95,7 @@ class ProcessSequenceAdapter implements ISequenceAdapter { */ async list(): Promise { const storedSequencesIds = await fs.readdir(this.config.sequencesRoot); + const sequencesConfigs = (await Promise.all( storedSequencesIds .map((id) => getRunnerConfigForStoredSequence(this.config.sequencesRoot, id)) From ebb2d98d409290813083161e087b70e0e7ec5748 Mon Sep 17 00:00:00 2001 From: gzukowski Date: Fri, 22 Sep 2023 07:28:55 +0000 Subject: [PATCH 10/11] Added suggested changes --- .../adapters/src/dockerode-docker-helper.ts | 22 +------------------ packages/adapters/src/types.ts | 11 +--------- packages/host/src/lib/host.ts | 4 +--- 3 files changed, 3 insertions(+), 34 deletions(-) diff --git a/packages/adapters/src/dockerode-docker-helper.ts b/packages/adapters/src/dockerode-docker-helper.ts index 64dd7d9dc..123cec0ec 100644 --- a/packages/adapters/src/dockerode-docker-helper.ts +++ b/packages/adapters/src/dockerode-docker-helper.ts @@ -282,27 +282,7 @@ export class DockerodeDockerHelper implements IDockerHelper { return Volumes.map(volume => volume.Name); } - /** - * Access to the value of volume's label - * @param volumeName Volume name. - * @param labelName Label name. - * - * @returns Promise which resolves when volume has been removed. - */ - async getLabelValue(volumeName: string, labelName: string): Promise { - try { - // Get information about the Docker volume - const volumeInfo = await this.dockerode.getVolume(volumeName).inspect(); - - // Access the labels property and retrieve the specific label - const labelValue = volumeInfo.Labels ? volumeInfo.Labels[labelName] : null; - - return labelValue; - } catch (error) { - this.logger.error(`Error reading Docker volume label: ${error}`); - return null; - } - } + /** * Attaches to container streams. * diff --git a/packages/adapters/src/types.ts b/packages/adapters/src/types.ts index b384547ff..537fce146 100644 --- a/packages/adapters/src/types.ts +++ b/packages/adapters/src/types.ts @@ -250,16 +250,7 @@ export interface IDockerHelper { * @returns {Promise} List of existing volumes */ listVolumes: () => Promise; - /** - * Get value of a certain label in scramjet's volume with the proviede id - * - * @param {string} volumeName Volume name. - * - * @param {string} labelName label name. - * - * @returns {Promise} The value of a label (null if it doesnt exist) - */ - getLabelValue: (volumeName: string, labelName: string) => Promise + /** * Creates volume. * diff --git a/packages/host/src/lib/host.ts b/packages/host/src/lib/host.ts index 149907bd4..b43a9d6b3 100644 --- a/packages/host/src/lib/host.ts +++ b/packages/host/src/lib/host.ts @@ -742,8 +742,6 @@ export class Host implements IComponent { const configs = await sequenceAdapter.list(); - sequenceAdapter.logger.pipe(this.logger); - for (const config of configs) { this.logger.trace(`Sequence identified: ${config.id}`); @@ -815,7 +813,7 @@ export class Host implements IComponent { this.logger.trace(`Sequence identified: ${config.id}`); await this.cpmConnector?.sendSequenceInfo( - id, // parentId + id, SequenceMessageCode.SEQUENCE_CREATED, config as unknown as GetSequenceResponse,); From 786f9efd090ef48da99aee1632ddef0e764be7cb Mon Sep 17 00:00:00 2001 From: gzukowski Date: Mon, 6 Nov 2023 19:17:38 +0000 Subject: [PATCH 11/11] . --- packages/host/src/lib/host.ts | 114 +++++++++------------------------- 1 file changed, 30 insertions(+), 84 deletions(-) diff --git a/packages/host/src/lib/host.ts b/packages/host/src/lib/host.ts index b43a9d6b3..ccec68ba8 100644 --- a/packages/host/src/lib/host.ts +++ b/packages/host/src/lib/host.ts @@ -10,10 +10,8 @@ import { CPMConnectorOptions, HostProxy, IComponent, - IMonitoringServerConstructor, IObjectLogger, LogLevel, - MonitoringServerConfig, NextCallback, OpResponse, ParsedMessage, @@ -41,7 +39,7 @@ import { DataStream } from "scramjet"; import { optionsMiddleware } from "./middlewares/options"; import { corsMiddleware } from "./middlewares/cors"; import { ConfigService, development } from "@scramjet/sth-config"; -import { isStartSequenceDTO, isStartSequenceEndpointPayloadDTO, readJsonFile, defer, FileBuilder } from "@scramjet/utility"; +import { isStartSequenceDTO, readJsonFile, defer, FileBuilder } from "@scramjet/utility"; import { inspect } from "util"; import { auditMiddleware, logger as auditMiddlewareLogger } from "./middlewares/audit"; import { AuditedRequest, Auditor } from "./auditor"; @@ -55,7 +53,6 @@ import TopicRouter from "./serviceDiscovery/topicRouter"; import { ContentType } from "./serviceDiscovery/contentType"; import SequenceStore from "./sequenceStore"; import { GetSequenceResponse } from "@scramjet/types/src/rest-api-sth"; -import { loadModule, logger as loadModuleLogger } from "@scramjet/module-loader"; const buildInfo = readJsonFile("build.info", __dirname, ".."); const packageFile = findPackage(__dirname).next(); @@ -213,26 +210,13 @@ export class Host implements IComponent { if (isDevelopment) this.logger.info("config", this.config); - this.logger.info("Node version:", process.version); - - loadModuleLogger.pipe(this.logger); - this.config.host.id ||= this.getId(); this.logger.updateBaseLog({ id: this.config.host.id }); this.serviceDiscovery = new ServiceDiscovery(this.logger, this.config.host.hostname); - if (sthConfig.telemetry.environment) { + if (sthConfig.telemetry.environment) this.telemetryEnvironmentName = sthConfig.telemetry.environment; - } - - if (sthConfig.monitorgingServer) { - this.startMonitoringServer(sthConfig.monitorgingServer).then((res) => { - this.logger.info("MonitoringServer started", res); - }, (e) => { - throw new Error(e); - }); - } this.auditor = new Auditor(); //this.auditor.logger.pipe(this.logger); @@ -266,19 +250,6 @@ export class Host implements IComponent { } } - private async startMonitoringServer(config: MonitoringServerConfig): Promise { - const { MonitoringServer } = await loadModule<{ MonitoringServer: IMonitoringServerConstructor }>({ name: "@scramjet/monitoring-server" }); - - this.logger.info("Starting monitoring server with config", config); - - const monitoringServer = new MonitoringServer({ - ...config, - check: async () => !!await this.loadCheck.getLoadCheck() - }); - - return monitoringServer.start(); - } - getId() { let id = this.config.host.id; @@ -925,57 +896,44 @@ export class Host implements IComponent { */ // eslint-disable-next-line complexity async handleStartSequence(req: ParsedMessage): Promise> { - try { - if (await this.loadCheck.overloaded()) { + if (await this.loadCheck.overloaded()) { + return { + opStatus: ReasonPhrases.INSUFFICIENT_SPACE_ON_RESOURCE, + }; + } + + const sequenceId = req.params?.id as string; + const payload = req.body || ({} as STHRestAPI.StartSequencePayload); + + if (payload.instanceId) { + if (!IDProvider.isValid(payload.instanceId)) { + return { opStatus: ReasonPhrases.UNPROCESSABLE_ENTITY, error: "Invalid Instance id" }; + } + + if (this.instancesStore[payload.instanceId]) { return { - opStatus: ReasonPhrases.INSUFFICIENT_SPACE_ON_RESOURCE, + opStatus: ReasonPhrases.CONFLICT, + error: "Instance with a given ID already exists" }; } + } - const sequenceId = req.params?.id as string; - const payload = req.body || ({} as STHRestAPI.StartSequencePayload); + let sequence = this.sequenceStore.getByNameOrId(sequenceId); -<<<<<<< HEAD - if (payload.instanceId) { - if (!isStartSequenceEndpointPayloadDTO(payload)) { - return { opStatus: ReasonPhrases.UNPROCESSABLE_ENTITY, error: "Invalid Instance id" }; - } - -<<<<<<< HEAD - if (this.instancesStore[payload.instanceId]) { - return { - opStatus: ReasonPhrases.CONFLICT, - error: "Instance with a given ID already exists" - }; - } - } -======= - console.log(sequence); -======= if (this.cpmConnector?.connected) { sequence ||= await this.getExternalSequence(sequenceId).catch((error: ReasonPhrases) => { this.logger.error("Error getting sequence from external sources", error); return undefined; }); ->>>>>>> bea3057d... Kubernetesconfig support added } ->>>>>>> 6f070b71... Support for process adapter - - let sequence = this.sequenceStore.getByNameOrId(sequenceId); - - if (this.cpmConnector?.connected) { - sequence ||= await this.getExternalSequence(sequenceId).catch((error: ReasonPhrases) => { - this.logger.error("Error getting sequence from external sources", error); - return undefined; - }); - } - if (!sequence) { - return { opStatus: ReasonPhrases.NOT_FOUND }; - } + if (!sequence) { + return { opStatus: ReasonPhrases.NOT_FOUND }; + } - this.logger.info("Start sequence", sequence.id, sequence.config.name); + this.logger.info("Start sequence", sequence.id, sequence.config.name); + try { const csic = await this.startCSIController(sequence, payload); await this.cpmConnector?.sendInstanceInfo({ @@ -1009,10 +967,10 @@ export class Host implements IComponent { }; } catch (error: any) { this.pushTelemetry("Instance start failed", { error: error.message }, "error"); - this.logger.error(error.message); + return { opStatus: ReasonPhrases.BAD_REQUEST, - error: error.message + error: error, }; } } @@ -1045,22 +1003,10 @@ export class Host implements IComponent { // eslint-disable-next-line complexity csic.on("pang", async (data) => { - this.logger.trace("PANG received", [{ ...data }]); + this.logger.trace("PANG received", data); if ((data.requires || data.provides) && !data.contentType) { - this.logger.warn("Missing topic content-type", data.provides, data.contentType); - - if (data.provides) { - data.contentType = this.serviceDiscovery.getTopics() - .find(t => t.topic === data.provides)?.contentType; - } - - if (data.contentType) { - this.logger.warn("Content-type set to match existing topic", data.contentType); - } else { - data.contentType = "application/x-ndjson"; - this.logger.warn("Content-type set to default", data.contentType); - } + this.logger.warn("Missing topic content-type"); } if (data.requires && !csic.inputRouted && data.contentType) {