Skip to content

Commit a542209

Browse files
committed
Reconnect. Fix starting instance
1 parent b75869b commit a542209

File tree

11 files changed

+87
-22
lines changed

11 files changed

+87
-22
lines changed

packages/adapters/src/get-runner-env.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import { RunnerEnvConfig, RunnerEnvironmentVariables } from "./types";
99
* @returns env vars
1010
*/
1111
export function getRunnerEnvVariables({
12-
sequencePath, instancesServerPort, instancesServerHost, instanceId, pipesPath, paths = "posix", sequenceInfo
12+
sequencePath, instancesServerPort, instancesServerHost, instanceId, pipesPath, paths = "posix", sequenceInfo, payload
1313
}: RunnerEnvConfig, extra: Record<string, string> = {}): RunnerEnvironmentVariables {
1414
const join = path[paths].join;
1515

@@ -24,6 +24,7 @@ export function getRunnerEnvVariables({
2424
PIPES_LOCATION: pipesPath,
2525
CRASH_LOG: join(pipesPath, "crash_log"),
2626
SEQUENCE_INFO: JSON.stringify(sequenceInfo),
27+
RUNNER_CONNECT_INFO: JSON.stringify(payload),
2728
...extra
2829
};
2930
}

packages/adapters/src/kubernetes-instance-adapter.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import { adapterConfigDecoder } from "./kubernetes-config-decoder";
2121
import { getRunnerEnvEntries } from "./get-runner-env";
2222
import { PassThrough } from "stream";
2323
import { RunnerExitCode } from "@scramjet/symbols";
24+
import { RunnerConnectInfo } from "@scramjet/types/src/runner-connect";
2425

2526
/**
2627
* Adapter for running Instance by Runner executed in separate process.
@@ -88,6 +89,9 @@ IComponent {
8889
}
8990
};
9091
}
92+
async dispatch(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo, payload: RunnerConnectInfo): Promise<void> {
93+
throw Error("not implemented");
94+
}
9195

9296
async run(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo): Promise<ExitCode> {
9397
if (config.type !== "kubernetes") {
@@ -181,6 +185,11 @@ IComponent {
181185
return 0;
182186
}
183187

188+
async waitUntilExit(config: InstanceConfig, instanceId:string, _sequenceInfo: SequenceInfo): Promise<number> {
189+
throw Error("Not implemented");
190+
}
191+
192+
184193
async cleanup(): Promise<void> {
185194
await this.remove(this.adapterConfig.timeout);
186195
}

packages/adapters/src/process-instance-adapter.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import { ChildProcess, spawn } from "child_process";
1616

1717
import path from "path";
1818
import { getRunnerEnvVariables } from "./get-runner-env";
19+
import { RunnerConnectInfo } from "@scramjet/types/src/runner-connect";
1920

2021
const isTSNode = !!(process as any)[Symbol.for("ts-node.register.instance")];
2122
const gotPython = "\n _ \n __ _____ _ __ ___ ___| |\n \\ \\ /\\ / / _ \\| '_ \\/ __|_ / |\n \\ V V / (_) | | | \\__ \\/ /|_|\n \\_/\\_/ \\___/|_| |_|___/___(_) 🐍\n";
@@ -116,8 +117,11 @@ class ProcessInstanceAdapter implements
116117
return pythonpath;
117118
}
118119

120+
async dispatch(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo, payload: RunnerConnectInfo): Promise<void> {
121+
throw Error("not implemented");
122+
}
123+
119124
async run(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo): Promise<ExitCode> {
120-
console.log("config type", config.type)
121125
if (config.type !== "process") {
122126
throw new Error("Process instance adapter run with invalid runner config");
123127
}
@@ -177,6 +181,10 @@ class ProcessInstanceAdapter implements
177181
return statusCode;
178182
}
179183

184+
async waitUntilExit(config: InstanceConfig, instanceId:string, _sequenceInfo: SequenceInfo): Promise<number> {
185+
throw Error("Not implemented");
186+
}
187+
180188
/**
181189
* Performs cleanup after Runner end.
182190
* Removes fifos used to communication with runner.

packages/host/src/lib/csi-controller.ts

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import {
2121
OpResponse,
2222
StopSequenceMessageData,
2323
HostProxy,
24-
ICommunicationHandler,
24+
ICommunicationHandler
2525
} from "@scramjet/types";
2626
import {
2727
AppError,
@@ -165,12 +165,12 @@ export class CSIController extends TypedEmitter<Events> {
165165
public communicationHandler: ICommunicationHandler,
166166
private sthConfig: STHConfiguration,
167167
private hostProxy: HostProxy,
168-
private adapter: STHConfiguration["runtimeAdapter"] = sthConfig.runtimeAdapter,
168+
private adapter: STHConfiguration["runtimeAdapter"]= sthConfig.runtimeAdapter
169169
) {
170170
super();
171171

172172
this.id = this.handshakeMessage.id;
173-
this.sequence = this.handshakeMessage.sequence;
173+
this.sequence = this.handshakeMessage.sequenceInfo;
174174
this.appConfig = this.handshakeMessage.payload.appConfig;
175175
this.args = this.handshakeMessage.payload.args;
176176
this.outputTopic = this.handshakeMessage.payload.outputTopic;
@@ -186,12 +186,14 @@ export class CSIController extends TypedEmitter<Events> {
186186
this.logger.debug("Constructor executed");
187187

188188
this.status = InstanceStatus.INITIALIZING;
189+
190+
189191
}
190192

191193
async start() {
192194
const i = new Promise((res, rej) => {
193195
this.initResolver = { res, rej };
194-
//this.startInstance();
196+
this.startInstance();
195197
});
196198

197199
i.then(() => this.main()).catch(async (e) => {
@@ -256,6 +258,8 @@ export class CSIController extends TypedEmitter<Events> {
256258

257259
this._instanceAdapter.logger.pipe(this.logger, { end: false });
258260

261+
this.endOfSequence = this._instanceAdapter.waitUntilExit(undefined, this.id, this.sequence)
262+
259263
// @todo this also is moved to CSIDispatcher in entirety
260264
const instanceMain = async () => {
261265
try {
@@ -507,7 +511,7 @@ export class CSIController extends TypedEmitter<Events> {
507511
}
508512

509513
this.info.ports = message[1].ports;
510-
this.sequence = message[1].sequence;
514+
this.sequence = message[1].sequenceInfo;
511515

512516
// TODO: add message to initiate the instance adapter
513517

@@ -525,7 +529,7 @@ export class CSIController extends TypedEmitter<Events> {
525529
}
526530

527531
this.info.started = new Date();
528-
this.logger.info("Instance started", this.info);
532+
this.logger.info("Instance started", JSON.stringify(message, undefined, 4));
529533
}
530534

531535
async handleInstanceConnect(streams: DownstreamStreamsConfig) {

packages/host/src/lib/csi-dispatcher.ts

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,16 @@ export class CSIDispatcher extends TypedEmitter<Events> {
3838

3939
async createCSIController(
4040
id: string,
41-
sequence: SequenceInfo,
41+
sequenceInfo: SequenceInfo,
4242
payload: StartSequencePayload,
4343
communicationHandler: ICommunicationHandler,
4444
config: STHConfiguration,
4545
instanceProxy: HostProxy) {
46-
sequence.instances = sequence.instances || new Set();
47-
const csiController = new CSIController({ id, sequence, payload }, communicationHandler, config, instanceProxy, this.STHConfig.runtimeAdapter);
46+
sequenceInfo.instances = sequenceInfo.instances || new Set();
47+
const csiController = new CSIController({ id, sequenceInfo, payload }, communicationHandler, config, instanceProxy, this.STHConfig.runtimeAdapter);
4848

4949
csiController.logger.pipe(this.logger);
50-
this.logger.trace("CSIController created", id);
50+
this.logger.trace("CSIController created", id, sequenceInfo);
5151

5252
csiController.logger.pipe(this.logger, { end: false });
5353
communicationHandler.logger.pipe(this.logger, { end: false });
@@ -108,7 +108,7 @@ export class CSIDispatcher extends TypedEmitter<Events> {
108108

109109
delete InstanceStore[csiController.id];
110110

111-
sequence.instances.delete(id);
111+
sequenceInfo.instances.delete(id);
112112

113113
// await this.cpmConnector?.sendInstanceInfo({
114114
// id: csiController.id,
@@ -138,13 +138,14 @@ export class CSIDispatcher extends TypedEmitter<Events> {
138138
this.emit("terminated", { id, code });
139139
});
140140

141-
await csiController.start();
141+
csiController.start();
142142

143143
this.logger.trace("csiController started", id);
144144

145-
sequence.instances.add(id);
145+
sequenceInfo.instances.add(id);
146146

147147
this.instancesStore[id] = csiController;
148+
148149
return csiController;
149150
}
150151

@@ -169,6 +170,7 @@ export class CSIDispatcher extends TypedEmitter<Events> {
169170
sequence,
170171
payload
171172
);
173+
172174
// @todo more instance info
173175
return {
174176
id,

packages/host/src/lib/host.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -956,6 +956,7 @@ export class Host implements IComponent {
956956
this.config,
957957
this.instanceProxy);
958958
}
959+
959960
await this.instancesStore[id].handleInstanceConnect(
960961
streams
961962
);

packages/model/src/stream-handler.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {
77
EncodedMonitoringMessage,
88
ICommunicationHandler,
99
IObjectLogger,
10+
InstanceConnectionInfo,
1011
LoggerOutput,
1112
MaybePromise,
1213
MessageDataType,
@@ -147,6 +148,14 @@ export class CommunicationHandler implements ICommunicationHandler {
147148
return this;
148149
}
149150

151+
waitForHandshake(): Promise<InstanceConnectionInfo> {
152+
return new Promise((res) => {
153+
this.addMonitoringHandler(RunnerMessageCode.PING, (msg) => {
154+
res(msg);
155+
});
156+
})
157+
}
158+
150159
pipeMessageStreams() {
151160
if (this._piped) {
152161
this.logger.error("pipeMessageStreams called twice");

packages/runner/src/bin/start-runner.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,26 @@ import fs from "fs";
55
import { AppConfig, SequenceInfo } from "@scramjet/types";
66
import { HostClient } from "../host-client";
77
import { RunnerExitCode } from "@scramjet/symbols";
8+
import { RunnerConnectInfo } from "@scramjet/types/src/runner-connect";
89

910
const sequencePath: string = process.env.SEQUENCE_PATH?.replace(/.js$/, "") + ".js";
1011
const instancesServerPort = process.env.INSTANCES_SERVER_PORT;
1112
const instancesServerHost = process.env.INSTANCES_SERVER_HOST;
1213
const instanceId = process.env.INSTANCE_ID;
1314
const sequenceInfo = process.env.SEQUENCE_INFO;
15+
const runnerConnectInfo = process.env.RUNNER_CONNECT_INFO;
1416

1517
let connectInfo: SequenceInfo;
18+
let parsedRunnerConnectInfo: RunnerConnectInfo;
19+
20+
try {
21+
if (!runnerConnectInfo) throw new Error("Connection JSON is required.");
22+
parsedRunnerConnectInfo = JSON.parse(runnerConnectInfo);
23+
} catch {
24+
console.error("Error while parsing connection information.");
25+
process.exit(RunnerExitCode.INVALID_ENV_VARS);
26+
}
27+
1628

1729
try {
1830
if (!sequenceInfo) throw new Error("Connection JSON is required.");
@@ -54,7 +66,7 @@ const hostClient = new HostClient(+instancesServerPort, instancesServerHost);
5466
* @param fifosPath - fifo files path
5567
*/
5668

57-
const runner: Runner<AppConfig> = new Runner(sequencePath, hostClient, instanceId, connectInfo);
69+
const runner: Runner<AppConfig> = new Runner(sequencePath, hostClient, instanceId, connectInfo, parsedRunnerConnectInfo);
5870

5971
runner.main()
6072
.catch(e => {

packages/runner/src/runner.ts

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import { mapToInputDataStream, readInputStreamHeaders } from "./input-stream";
3333
import { MessageUtils } from "./message-utils";
3434
import { HostClient as HostApiClient } from "@scramjet/api-client";
3535
import { ClientUtilsCustomAgent } from "@scramjet/client-utils";
36+
import { RunnerConnectInfo } from "@scramjet/types/src/runner-connect";
3637

3738
// async function flushStream(source: Readable | undefined, target: Writable) {
3839
// if (!source) return;
@@ -122,15 +123,22 @@ export class Runner<X extends AppConfig> implements IComponent {
122123
private outputDataStream: DataStream;
123124
private sequenceInfo: SequenceInfo;
124125

126+
private runnerConnectInfo: RunnerConnectInfo = {
127+
appConfig: {}
128+
};
129+
125130
constructor(
126131
private sequencePath: string,
127132
private hostClient: IHostClient,
128133
private instanceId: string,
129-
sequenceInfo: SequenceInfo
134+
sequenceInfo: SequenceInfo,
135+
runnerConnectInfo: RunnerConnectInfo
130136
) {
131137
this.sequenceInfo = sequenceInfo;
132138
this.emitter = new EventEmitter();
133139

140+
this.runnerConnectInfo = runnerConnectInfo;
141+
134142
this.logger = new ObjLogger(this, { id: instanceId });
135143
hostClient.logger.pipe(this.logger);
136144

@@ -455,7 +463,12 @@ export class Runner<X extends AppConfig> implements IComponent {
455463

456464
sendHandshakeMessage() {
457465
// TODO: send connection info
458-
MessageUtils.writeMessageOnStream([RunnerMessageCode.PING, { id: this.instanceId, sequenceInfo: this.sequenceInfo }], this.hostClient.monitorStream);
466+
MessageUtils.writeMessageOnStream([
467+
RunnerMessageCode.PING, {
468+
id: this.instanceId,
469+
sequenceInfo: this.sequenceInfo,
470+
payload: this.runnerConnectInfo
471+
}], this.hostClient.monitorStream);
459472

460473
this.logger.trace("Handshake sent");
461474
}

packages/types/src/lifecycle-adapters.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ export interface ILifeCycleAdapterMain {
3131

3232
getCrashLog(): Promise<string[]>;
3333

34-
waitUntilExit(config: InstanceConfig, instanceId: string, sequenceInfo: SequenceInfo): Promise<ExitCode>;
34+
waitUntilExit(config: InstanceConfig | undefined, instanceId: string, sequenceInfo: SequenceInfo): Promise<ExitCode>;
3535
}
3636
// @TODO create ISequenceAdapter interface
3737

0 commit comments

Comments
 (0)