Skip to content
4 changes: 3 additions & 1 deletion packages/adapters/src/docker-instance-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ IComponent {

const networkSetup = await this.getNetworkSetup();

const volumeId = config.id + "_" + config.parent_id;

const envs = getRunnerEnvEntries({
sequencePath: path.join(config.sequenceDir, config.entrypointPath),
instancesServerPort,
Expand All @@ -200,7 +202,7 @@ IComponent {
imageName: config.container.image,
volumes: [
...extraVolumes,
{ mountPoint: config.sequenceDir, volume: config.id, writeable: false }
{ mountPoint: config.sequenceDir, volume: volumeId, writeable: false }
],
labels: {
"scramjet.sequence.name": config.name
Expand Down
29 changes: 21 additions & 8 deletions packages/adapters/src/docker-sequence-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,18 @@ class DockerSequenceAdapter implements ISequenceAdapter {
this.logger.debug("Identify started", volume, this.config.docker.prerunner?.maxMem || 0);

const ret = await this.parsePackage(streams, wait, volume);
const [, parentId] = volume.split("_");

if (!ret.id) {
return undefined;
}

this.logger.info("Identified image for volume", { volume, image: ret.container?.image });

if (parentId) {
ret.parent_id = parentId;
}

return ret;
} catch (e: any) {
this.logger.error("Docker failed", e.message, volume);
Expand All @@ -132,17 +137,18 @@ class DockerSequenceAdapter implements ISequenceAdapter {
* @param {Readable} stream Stream containing sequence to be identified.
* @param {string} id Id for the new docker volume where sequence will be stored.
* @param {boolean} override Removes previous sequence
* @param {string} parentId Id which indicates sequence's source.

* @returns {Promise<SequenceConfig>} Promise resolving to sequence config.
*/
async identify(stream: Readable, id: string, override = false): Promise<SequenceConfig> {
async identify(stream: Readable, id: string, override = false, parentId: string): Promise<SequenceConfig> {
const volStart = new Date();

if (override) {
await this.dockerHelper.removeVolume(id);
}

const volumeId = await this.createVolume(id);

const volumeId = await this.createVolume(id, parentId);
const volSecs = (new Date().getTime() - volStart.getTime()) / 1000;

appendFile("timing-log.ndjson", JSON.stringify({
Expand Down Expand Up @@ -188,6 +194,9 @@ class DockerSequenceAdapter implements ISequenceAdapter {
const config = await this.parsePackage(streams, wait, volumeId);

await this.fetch(config.container.image);
if (parentId) {
config.parent_id = parentId;
}

return config;
} catch (err: any) {
Expand All @@ -204,11 +213,12 @@ class DockerSequenceAdapter implements ISequenceAdapter {
* Creates volume with provided id.
*
* @param {string} id Volume id.
* @param {string} parentId Sequence's parentId.
* @returns {DockerVolume} Created volume.
*/
private async createVolume(id: string): Promise<DockerVolume> {
private async createVolume(id: string, parentId?: string): Promise<DockerVolume> {
try {
return await this.dockerHelper.createVolume(id);
return await this.dockerHelper.createVolume(id, parentId);
} catch (error: any) {
this.logger.error("Error creating volume", id);

Expand All @@ -233,7 +243,6 @@ class DockerSequenceAdapter implements ISequenceAdapter {
const parseStart = new Date();

const [preRunnerResult] = (await Promise.all([readStreamedJSON(streams.stdout as Readable), wait])) as any;

const parseSecs = (new Date().getTime() - parseStart.getTime()) / 1000;

appendFile("timing-log.ndjson", JSON.stringify({
Expand All @@ -252,6 +261,7 @@ class DockerSequenceAdapter implements ISequenceAdapter {
const validPackageJson = await sequencePackageJSONDecoder.decodeToPromise(preRunnerResult);
const engines = validPackageJson.engines ? { ...validPackageJson.engines } : {};
const config = validPackageJson.scramjet?.config ? { ...validPackageJson.scramjet.config } : {};
const [id, parentId] = volumeId.split("_");

const container = Object.assign({}, this.config.docker.runner);

Expand All @@ -268,7 +278,8 @@ class DockerSequenceAdapter implements ISequenceAdapter {
config,
sequenceDir: PACKAGE_DIR,
entrypointPath: validPackageJson.main,
id: volumeId,
id: id,
parent_id: parentId,
description: validPackageJson.description,
author: validPackageJson.author,
keywords: validPackageJson.keywords,
Expand All @@ -288,7 +299,9 @@ class DockerSequenceAdapter implements ISequenceAdapter {
throw new Error(`Incorrect SequenceConfig passed to DockerSequenceAdapter: ${config.type}`);
}

await this.dockerHelper.removeVolume(config.id);
const volumeId = config.id + "_" + config.parent_id;

await this.dockerHelper.removeVolume(volumeId);

this.logger.debug("Volume removed", config.id);
}
Expand Down
6 changes: 4 additions & 2 deletions packages/adapters/src/dockerode-docker-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -250,13 +250,15 @@ export class DockerodeDockerHelper implements IDockerHelper {
* Creates docker volume.
*
* @param name Volume name. Optional. If not provided, volume will be named with unique name.
* @param parentId Volume parentId. Optional. If not provided, volume will be named the same as the name.
* @returns Volume name.
*/
async createVolume(name: string = ""): Promise<DockerVolume> {
async createVolume(name: string = "", parentId: string = name): Promise<DockerVolume> {
name = name + "_" + parentId;
return this.dockerode.createVolume({
Name: name,
Labels: {
"org.scramjet.host.is-sequence": "true"
"org.scramjet.host.is-sequence": "true",
}
}).then((volume: Dockerode.Volume) => {
return volume.name;
Expand Down
30 changes: 24 additions & 6 deletions packages/adapters/src/kubernetes-sequence-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { isDefined, readStreamedJSON } from "@scramjet/utility";
import { sequencePackageJSONDecoder } from "./validate-sequence-package-json";
import { adapterConfigDecoder } from "./kubernetes-config-decoder";
import { detectLanguage } from "./utils";
import { IDProvider } from "@scramjet/model";

/**
* Returns existing Sequence configuration.
Expand All @@ -24,8 +25,22 @@ import { detectLanguage } from "./utils";
* @param {string} id Sequence Id.
* @returns {ProcessSequenceConfig} Sequence configuration.
*/
async function getRunnerConfigForStoredSequence(sequencesRoot: string, id: string): Promise<KubernetesSequenceConfig> {
const sequenceDir = path.join(sequencesRoot, id);
// eslint-disable-next-line max-len
async function getRunnerConfigForStoredSequence(sequencesRoot: string, id: string, parentId?: string): Promise<KubernetesSequenceConfig> {
let sequenceDir: string;

if (parentId) {
sequenceDir = path.join(sequencesRoot, id + "_" + parentId);
} else {
[id, parentId] = id.split("_");
const valid = IDProvider.isValid(id);

if (valid) {
sequenceDir = path.join(sequencesRoot, id + "_" + parentId);
} else {
sequenceDir = path.join(sequencesRoot, id);
}
}
const packageJsonPath = path.join(sequenceDir, "package.json");
const packageJson = await readStreamedJSON(createReadStream(packageJsonPath));

Expand All @@ -38,6 +53,7 @@ async function getRunnerConfigForStoredSequence(sequencesRoot: string, id: strin
version: validPackageJson.version ?? "",
name: validPackageJson.name ?? "",
id,
parent_id: parentId || id,
sequenceDir,
engines,
description: validPackageJson.description,
Expand Down Expand Up @@ -111,12 +127,14 @@ class KubernetesSequenceAdapter implements ISequenceAdapter {
* @param {Readable} stream Stream with packed sequence.
* @param {string} id Sequence Id.
* @param {boolean} override Removes previous sequence
* @param {string} parentId Sequence's parentId.

* @returns {Promise<SequenceConfig>} Promise resolving to identified sequence configuration.
*/
async identify(stream: Readable, id: string, override = false): Promise<SequenceConfig> {
async identify(stream: Readable, id: string, override = false, parentId = id,): Promise<SequenceConfig> {
// 1. Unpack package.json to stdout and map to config
// 2. Create compressed package on the disk
const sequenceDir = path.join(this.adapterConfig.sequencesRoot, id);
const sequenceDir = path.join(this.adapterConfig.sequencesRoot, id + "_" + parentId);

if (override) {
await fs.rm(sequenceDir, { recursive: true, force: true });
Expand All @@ -134,7 +152,7 @@ class KubernetesSequenceAdapter implements ISequenceAdapter {

await new Promise(res => uncompressingProc.on("close", res));

return getRunnerConfigForStoredSequence(this.adapterConfig.sequencesRoot, id);
return getRunnerConfigForStoredSequence(this.adapterConfig.sequencesRoot, id, parentId);
}

/**
Expand All @@ -148,7 +166,7 @@ class KubernetesSequenceAdapter implements ISequenceAdapter {
throw new Error(`Incorrect SequenceConfig passed to KubernetesSequenceAdapter: ${config.type}`);
}

const sequenceDir = path.join(this.adapterConfig.sequencesRoot, config.id);
const sequenceDir = config.sequenceDir;

this.logger.debug("Removing sequence directory...", sequenceDir);

Expand Down
34 changes: 25 additions & 9 deletions packages/adapters/src/process-sequence-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import path from "path";
import { exec } from "child_process";
import { isDefined, readStreamedJSON } from "@scramjet/utility";
import { sequencePackageJSONDecoder } from "./validate-sequence-package-json";
import { SequenceAdapterError } from "@scramjet/model";
import { IDProvider, SequenceAdapterError } from "@scramjet/model";
import { detectLanguage } from "./utils";

/**
Expand All @@ -23,12 +23,24 @@ import { detectLanguage } from "./utils";
* @param {string} id Sequence Id.
* @returns {ProcessSequenceConfig} Sequence configuration.
*/
// eslint-disable-next-line complexity
async function getRunnerConfigForStoredSequence(sequencesRoot: string, id: string): Promise<ProcessSequenceConfig> {
const sequenceDir = path.join(sequencesRoot, id);
// eslint-disable-next-line complexity, max-len
async function getRunnerConfigForStoredSequence(sequencesRoot: string, id: string, parentId?: string): Promise<ProcessSequenceConfig> {
let sequenceDir: string;

if (parentId) {
sequenceDir = path.join(sequencesRoot, id + "_" + parentId);
} else {
[id, parentId] = id.split("_");
const valid = IDProvider.isValid(id);

if (valid) {
sequenceDir = path.join(sequencesRoot, id + "_" + parentId);
} else {
sequenceDir = path.join(sequencesRoot, id);
}
}
const packageJsonPath = path.join(sequenceDir, "package.json");
const packageJson = await readStreamedJSON(createReadStream(packageJsonPath));

const validPackageJson = await sequencePackageJSONDecoder.decodeToPromise(packageJson);
const engines = validPackageJson.engines ? { ...validPackageJson.engines } : {};

Expand All @@ -39,6 +51,7 @@ async function getRunnerConfigForStoredSequence(sequencesRoot: string, id: strin
version: validPackageJson.version ?? "",
name: validPackageJson.name ?? "",
id,
parent_id: parentId || id,
sequenceDir,
description: validPackageJson.description,
author: validPackageJson.author,
Expand Down Expand Up @@ -82,6 +95,7 @@ class ProcessSequenceAdapter implements ISequenceAdapter {
*/
async list(): Promise<SequenceConfig[]> {
const storedSequencesIds = await fs.readdir(this.config.sequencesRoot);

const sequencesConfigs = (await Promise.all(
storedSequencesIds
.map((id) => getRunnerConfigForStoredSequence(this.config.sequencesRoot, id))
Expand All @@ -100,10 +114,12 @@ class ProcessSequenceAdapter implements ISequenceAdapter {
* @param {Readable} stream Stream with packed sequence.
* @param {string} id Sequence Id.
* @param {boolean} override Removes previous sequence
* @param {string} parentId Sequence's parentId.

* @returns {Promise<SequenceConfig>} Promise resolving to identified sequence configuration.
*/
async identify(stream: Readable, id: string, override = false): Promise<SequenceConfig> {
const sequenceDir = path.join(this.config.sequencesRoot, id);
async identify(stream: Readable, id: string, override = false, parentId = id): Promise<SequenceConfig> {
const sequenceDir = path.join(this.config.sequencesRoot, id + "_" + parentId);

if (override) {
await fs.rm(sequenceDir, { recursive: true, force: true });
Expand Down Expand Up @@ -140,7 +156,7 @@ class ProcessSequenceAdapter implements ISequenceAdapter {

this.logger.debug("Unpacking sequence succeeded", stderrOutput);

return getRunnerConfigForStoredSequence(this.config.sequencesRoot, id);
return getRunnerConfigForStoredSequence(this.config.sequencesRoot, id, parentId);
}

/**
Expand All @@ -154,7 +170,7 @@ class ProcessSequenceAdapter implements ISequenceAdapter {
throw new Error(`Incorrect SequenceConfig passed to ProcessSequenceAdapter: ${config.type}`);
}

const sequenceDir = path.join(this.config.sequencesRoot, config.id);
const sequenceDir = config.sequenceDir;

return fs.rm(sequenceDir, { recursive: true });
}
Expand Down
3 changes: 2 additions & 1 deletion packages/adapters/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ export type DockerAdapterRunResponse = {
containerId: DockerContainer
};
export interface IDockerHelper {
[x: string]: any;
logger: IObjectLogger;

/**
Expand Down Expand Up @@ -257,7 +258,7 @@ export interface IDockerHelper {
*
* @returns {Promise<DockerVolume>} Created volume.
*/
createVolume: (name?: string) => Promise<DockerVolume>;
createVolume: (name?: string, parentId?: string) => Promise<DockerVolume>;

/**
* Removes volume.
Expand Down
2 changes: 1 addition & 1 deletion packages/host/src/lib/cpm-connector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ export class CPMConnector extends TypedEmitter<Events> {
this.logger.trace("Send sequence status update", sequenceId, seqStatus);

await this.communicationStream?.whenWrote(
[CPMMessageCode.SEQUENCE, { id: sequenceId, status: seqStatus, config }]
[CPMMessageCode.SEQUENCE, { id: sequenceId, status: seqStatus, config: config, }]
);

this.logger.trace("Sequence status update sent", sequenceId, seqStatus);
Expand Down
3 changes: 2 additions & 1 deletion packages/host/src/lib/csi-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,8 @@ export class CSIController extends TypedEmitter<Events> {
id: this.sequence.id,
config: this.sequence.config,
name: this.sequence.name,
location : this.sequence.location
location : this.sequence.location,
parent_id: this.sequence.parent_id
},
ports: this.info.ports,
created: this.info.created,
Expand Down
Loading