Skip to content

Commit b75869b

Browse files
committed
pushing payload to runner/ docker run&disptach
1 parent 7ad6f74 commit b75869b

File tree

7 files changed

+44
-27
lines changed

7 files changed

+44
-27
lines changed

packages/adapters/src/docker-instance-adapter.ts

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import { STH_DOCKER_NETWORK, isHostSpawnedInDockerContainer, getHostname } from
2222
import { ObjLogger } from "@scramjet/obj-logger";
2323
import { getRunnerEnvEntries } from "./get-runner-env";
2424
import { Readable } from "stream";
25+
import { RunnerConnectInfo } from "@scramjet/types/src/runner-connect";
2526

2627
/**
2728
* Adapter for running Instance by Runner executed in Docker container.
@@ -168,13 +169,13 @@ IComponent {
168169
};
169170
}
170171

171-
async run(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo): Promise<ExitCode> {
172-
await this.dispatch(config, instancesServerPort, instanceId, sequenceInfo);
172+
async run(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo, payload: RunnerConnectInfo): Promise<ExitCode> {
173+
await this.dispatch(config, instancesServerPort, instanceId, sequenceInfo, payload);
173174
return this.waitUntilExit(config, instanceId, sequenceInfo);
174175
}
175176

176177
// eslint-disable-next-line complexity
177-
async dispatch(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo): Promise<void> {
178+
async dispatch(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo, payload: RunnerConnectInfo): Promise<void> {
178179
if (config.type !== "docker") {
179180
throw new Error("Docker instance adapter run with invalid runner config");
180181
}
@@ -198,7 +199,8 @@ IComponent {
198199
instancesServerHost: networkSetup.host,
199200
instanceId,
200201
pipesPath: "",
201-
sequenceInfo
202+
sequenceInfo,
203+
payload
202204
}).map(([k, v]) => `${k}=${v}`);
203205

204206
this.logger.debug("Runner will start with envs", envs);
@@ -228,7 +230,7 @@ IComponent {
228230
this.logger.trace("Container is running", containerId);
229231
}
230232

231-
async waitUntilExit(instanceId: string): Promise<number> {
233+
async waitUntilExit(config: InstanceConfig, instanceId:string, _sequenceInfo: SequenceInfo): Promise<number> {
232234
try {
233235
const containerId = await this.dockerHelper.getContainerIdByLabel("scramjet.instance.id", instanceId);
234236
const { statusCode } = await this.dockerHelper.wait(containerId);

packages/adapters/src/types.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { ExitCode, InstanceId, IObjectLogger, SequenceInfo } from "@scramjet/types";
2+
import { StartSequencePayload } from "@scramjet/types/src/rest-api-sth";
23
import { ContainerStats, NetworkInspectInfo } from "dockerode";
34
import { PathLike } from "fs";
45
import { Stream, Writable } from "stream";
@@ -323,6 +324,7 @@ export type RunnerEnvConfig = {
323324
instancesServerHost: string;
324325
instanceId: InstanceId;
325326
sequenceInfo: SequenceInfo
327+
payload?: StartSequencePayload
326328
}
327329

328330
export type RunnerEnvironmentVariables = Partial<{

packages/host/src/lib/csi-controller.ts

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import {
44
DownstreamStreamsConfig,
55
EncodedMessage,
66
HandshakeAcknowledgeMessage,
7-
ICommunicationHandler,
87
ParsedMessage,
98
PassThroughStreamsConfig,
109
ReadableStream,
@@ -22,13 +21,14 @@ import {
2221
OpResponse,
2322
StopSequenceMessageData,
2423
HostProxy,
24+
ICommunicationHandler,
2525
} from "@scramjet/types";
2626
import {
2727
AppError,
2828
CSIControllerError,
2929
HostError,
3030
MessageUtilities,
31-
InstanceAdapterError,
31+
InstanceAdapterError
3232
} from "@scramjet/model";
3333
import { CommunicationChannel as CC, RunnerExitCode, RunnerMessageCode } from "@scramjet/symbols";
3434
import { Duplex, PassThrough, Readable } from "stream";
@@ -165,18 +165,18 @@ export class CSIController extends TypedEmitter<Events> {
165165
public communicationHandler: ICommunicationHandler,
166166
private sthConfig: STHConfiguration,
167167
private hostProxy: HostProxy,
168-
private adapter: STHConfiguration["runtimeAdapter"] = sthConfig.runtimeAdapter
168+
private adapter: STHConfiguration["runtimeAdapter"] = sthConfig.runtimeAdapter,
169169
) {
170170
super();
171171

172172
this.id = this.handshakeMessage.id;
173-
this.sequence = this.handshakeMessage.sequenceInfo;
174-
this.appConfig = payload.appConfig;
175-
this.args = payload.args;
176-
this.outputTopic = payload.outputTopic;
177-
this.inputTopic = payload.inputTopic;
173+
this.sequence = this.handshakeMessage.sequence;
174+
this.appConfig = this.handshakeMessage.payload.appConfig;
175+
this.args = this.handshakeMessage.payload.args;
176+
this.outputTopic = this.handshakeMessage.payload.outputTopic;
177+
this.inputTopic = this.handshakeMessage.payload.inputTopic;
178178
this.limits = {
179-
memory: payload.limits?.memory || sthConfig.docker.runner.maxMem
179+
memory: handshakeMessage.payload.limits?.memory || sthConfig.docker.runner.maxMem
180180
};
181181

182182
this.instanceLifetimeExtensionDelay = +sthConfig.timings.instanceLifetimeExtensionDelay;
@@ -507,7 +507,7 @@ export class CSIController extends TypedEmitter<Events> {
507507
}
508508

509509
this.info.ports = message[1].ports;
510-
this.sequence = message[1].sequenceInfo;
510+
this.sequence = message[1].sequence;
511511

512512
// TODO: add message to initiate the instance adapter
513513

packages/host/src/lib/csi-dispatcher.ts

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import { ObjLogger } from "@scramjet/obj-logger";
2-
import { HostProxy, IObjectLogger, InstanceConfig, MessageDataType, STHConfiguration, STHRestAPI, SequenceInfo } from "@scramjet/types";
2+
import { HostProxy, ICommunicationHandler, IObjectLogger, InstanceConfig, MessageDataType, STHConfiguration, STHRestAPI, SequenceInfo } from "@scramjet/types";
33
import { SocketServer } from "./socket-server";
44
import { InstanceStore } from "./instance-store";
55
import { CSIController } from "./csi-controller";
6-
import { CommunicationHandler, IDProvider } from "@scramjet/model";
6+
import { IDProvider } from "@scramjet/model";
77
import { StartSequencePayload } from "@scramjet/types/src/rest-api-sth";
88
import { getInstanceAdapter } from "@scramjet/adapters";
99
import { TypedEmitter } from "@scramjet/utility";
@@ -36,9 +36,15 @@ export class CSIDispatcher extends TypedEmitter<Events> {
3636
this.STHConfig = STHConfig;
3737
}
3838

39-
createCSIController(id: string, sequence: SequenceInfo, payload: StartSequencePayload, communicationHandler: CommunicationHandler, config: STHConfiguration, instanceProxy: HostProxy) {
39+
async createCSIController(
40+
id: string,
41+
sequence: SequenceInfo,
42+
payload: StartSequencePayload,
43+
communicationHandler: ICommunicationHandler,
44+
config: STHConfiguration,
45+
instanceProxy: HostProxy) {
4046
sequence.instances = sequence.instances || new Set();
41-
const csiController = new CSIController(id, sequence, payload, communicationHandler, config, instanceProxy);
47+
const csiController = new CSIController({ id, sequence, payload }, communicationHandler, config, instanceProxy, this.STHConfig.runtimeAdapter);
4248

4349
csiController.logger.pipe(this.logger);
4450
this.logger.trace("CSIController created", id);
@@ -132,7 +138,7 @@ export class CSIDispatcher extends TypedEmitter<Events> {
132138
this.emit("terminated", { id, code });
133139
});
134140

135-
csiController.start();
141+
await csiController.start();
136142

137143
this.logger.trace("csiController started", id);
138144

@@ -161,7 +167,7 @@ export class CSIDispatcher extends TypedEmitter<Events> {
161167
this.STHConfig.host.instancesServerPort,
162168
id,
163169
sequence,
164-
//payload
170+
payload
165171
);
166172
// @todo more instance info
167173
return {

packages/host/src/lib/host.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -901,7 +901,7 @@ export class Host implements IComponent {
901901
this.logger.info("Start sequence", sequence.id, sequence.config.name);
902902

903903
try {
904-
const runner = this.csiDispatcher.startRunner(sequence, payload);
904+
const runner = await this.csiDispatcher.startRunner(sequence, payload);
905905

906906
// @todo more info
907907
await this.cpmConnector?.sendInstanceInfo({
@@ -948,7 +948,13 @@ export class Host implements IComponent {
948948
// @todo need more instance info
949949
if (!this.instancesStore[id]) {
950950
this.logger.info("creating new CSIController for runner connecting");
951-
this.csiDispatcher.createCSIController(id, {} as SequenceInfo, {} as STHRestAPI.StartSequencePayload, new CommunicationHandler(), this.config, this.instanceProxy);
951+
await this.csiDispatcher.createCSIController(
952+
id,
953+
{} as SequenceInfo,
954+
{} as STHRestAPI.StartSequencePayload,
955+
new CommunicationHandler(),
956+
this.config,
957+
this.instanceProxy);
952958
}
953959
await this.instancesStore[id].handleInstanceConnect(
954960
streams

packages/types/src/lifecycle-adapters.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { InstanceConfig } from "./runner-config";
44
import { IObjectLogger } from "./object-logger";
55
import { InstanceLimits } from "./instance-limits";
66
import { SequenceInfo } from "./sequence-adapter";
7+
import { RunnerConnectInfo } from "./runner-connect";
78

89
export type ExitCode = number;
910

@@ -30,7 +31,7 @@ export interface ILifeCycleAdapterMain {
3031

3132
getCrashLog(): Promise<string[]>;
3233

33-
waitUntilExit(instanceId: string): Promise<ExitCode>;
34+
waitUntilExit(config: InstanceConfig, instanceId: string, sequenceInfo: SequenceInfo): Promise<ExitCode>;
3435
}
3536
// @TODO create ISequenceAdapter interface
3637

@@ -43,15 +44,15 @@ export interface ILifeCycleAdapterRun extends ILifeCycleAdapterMain {
4344
* @param {InstanceConfig} Runner configuration.
4445
* @returns {ExitCode} Runner exit code.
4546
*/
46-
dispatch(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo): Promise<void>;
47+
dispatch(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo, payload: RunnerConnectInfo): Promise<void>;
4748

4849
/**
4950
* Starts Runner - in essence does `dispatch` and then `waitUntilExit`.
5051
*
5152
* @param {InstanceConfig} Runner configuration.
5253
* @returns {ExitCode} Runner exit code.
5354
*/
54-
run(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo): Promise<ExitCode>;
55+
run(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo, payload: RunnerConnectInfo): Promise<ExitCode>;
5556
}
5657

5758
export type LifeCycleError = any | (Error & { exitCode?: number, errorMessage?: string });

packages/types/src/messages/handshake.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ export type HandshakeMessage = {
1313
payload: StartSequencePayload
1414
};
1515

16-
export type PingMessageData = { id: string, ports?: Record<string, string>; sequenceInfo: SequenceInfo, payload: StartSequencePayload }
16+
export type PingMessageData = { id: string, ports?: Record<string, string>; sequence: SequenceInfo, payload: StartSequencePayload }
1717

1818
export type PangMessageData = {
1919
requires?: string,

0 commit comments

Comments
 (0)