Skip to content

Commit a3e50ec

Browse files
committed
Reconnect. k8s adapter run
1 parent 7fb3644 commit a3e50ec

File tree

4 files changed

+30
-27
lines changed

4 files changed

+30
-27
lines changed

packages/adapters/src/kubernetes-instance-adapter.ts

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ IComponent {
3939
private adapterConfig: K8SAdapterConfiguration;
4040
private _limits?: InstanceLimits = {};
4141

42+
stdErrorStream?: PassThrough;
43+
4244
get limits() { return this._limits || {} as InstanceLimits; }
4345
private set limits(value: InstanceLimits) { this._limits = value; }
4446

@@ -89,17 +91,13 @@ IComponent {
8991
}
9092
};
9193
}
92-
async dispatch(_config: InstanceConfig, _instancesServerPort: number, _instanceId: string, _sequenceInfo: SequenceInfo, _payload: RunnerConnectInfo): Promise<void> {
93-
throw Error("not implemented");
94-
}
95-
96-
async run(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo): Promise<ExitCode> {
94+
async dispatch(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo, _payload: RunnerConnectInfo): Promise<void> {
9795
if (config.type !== "kubernetes") {
9896
throw new Error(`Invalid config type for kubernetes adapter: ${config.type}`);
9997
}
10098

10199
if (this.adapterConfig.quotaName && await this.kubeClient.isPodsLimitReached(this.adapterConfig.quotaName)) {
102-
return RunnerExitCode.PODS_LIMIT_REACHED;
100+
throw Error(RunnerExitCode.PODS_LIMIT_REACHED.toString());
103101
}
104102

105103
this.limits = config.limits;
@@ -154,25 +152,27 @@ IComponent {
154152
// This means runner pod was unable to start. So it went from "Pending" to "Failed" state directly.
155153
// Return 1 which is Linux exit code for "General Error" since we are not able
156154
// to determine what happened exactly.
157-
return startPodStatus.code || 137;
155+
return;
158156
}
159157

160158
this.logger.debug("Copy sequence files to Runner");
161159

162160
const compressedStream = createReadStream(path.join(config.sequenceDir, "compressed.tar.gz"));
163-
const stdErrorStream = new PassThrough();
164161

165-
stdErrorStream.on("data", (data) => { this.logger.error("POD stderr", data.toString()); });
162+
this.stdErrorStream = new PassThrough();
163+
this.stdErrorStream.on("data", (data) => { this.logger.error("POD stderr", data.toString()); });
166164

167-
await this.kubeClient.exec(runnerName, runnerName, ["unpack.sh", "/package"], process.stdout, stdErrorStream, compressedStream, 2);
165+
await this.kubeClient.exec(runnerName, runnerName, ["unpack.sh", "/package"], process.stdout, this.stdErrorStream, compressedStream, 2);
166+
}
168167

169-
const exitPodStatus = await this.kubeClient.waitForPodStatus(runnerName, ["Succeeded", "Failed", "Unknown"]);
168+
async waitUntilExit(_config: InstanceConfig, _instanceId: string, _sequenceInfo: SequenceInfo): Promise<ExitCode> {
169+
const exitPodStatus = await this.kubeClient.waitForPodStatus(this._runnerName!, ["Succeeded", "Failed", "Unknown"]);
170170

171-
stdErrorStream.end();
171+
this.stdErrorStream?.end();
172172

173173
if (exitPodStatus.status !== "Succeeded") {
174174
this.logger.error("Runner stopped incorrectly", exitPodStatus);
175-
this.logger.error("Container failure reason is: ", await this.kubeClient.getPodTerminatedContainerReason(runnerName));
175+
this.logger.error("Container failure reason is: ", await this.kubeClient.getPodTerminatedContainerReason(this._runnerName!));
176176

177177
return exitPodStatus.code || 137;
178178
}
@@ -185,9 +185,9 @@ IComponent {
185185
return 0;
186186
}
187187

188-
async waitUntilExit(_config: InstanceConfig, _instanceId:string, _sequenceInfo: SequenceInfo): Promise<number> {
189-
this.logger.debug("WaitUntilExit", [_config, _instanceId, _sequenceInfo]);
190-
throw Error("Not implemented");
188+
async run(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo, payload: RunnerConnectInfo): Promise<ExitCode> {
189+
await this.dispatch(config, instancesServerPort, instanceId, sequenceInfo, payload);
190+
return this.waitUntilExit(config, instanceId, sequenceInfo);
191191
}
192192

193193
async cleanup(): Promise<void> {

packages/adapters/src/process-instance-adapter.ts

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,10 @@ class ProcessInstanceAdapter implements
6060

6161
if (!runnerProcess) {
6262
// Runner process not initialized yet
63-
return msg;
63+
return {
64+
...msg,
65+
processId: this.processPID
66+
};
6467
}
6568

6669
return {
@@ -170,10 +173,6 @@ class ProcessInstanceAdapter implements
170173

171174
this.logger.trace("Runner process is running", runnerProcess.pid);
172175

173-
// @todo exit here with pid
174-
// then promise waiting for process with given pid finish (endOfRun)
175-
// how to connect to a process knowing id of it?
176-
177176
this.runnerProcess = runnerProcess;
178177
}
179178

packages/model/src/stream-handler.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ export class CommunicationHandler implements ICommunicationHandler {
163163
this.logger.error("pipeMessageStreams called twice");
164164
throw new Error("pipeMessageStreams called twice");
165165
}
166+
166167
this._piped = true;
167168

168169
if (!this.downstreams || !this.upstreams) {

packages/runner/src/runner.ts

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,15 @@ import { mapToInputDataStream, readInputStreamHeaders } from "./input-stream";
3737
import { MessageUtils } from "./message-utils";
3838
import { RunnerAppContext, RunnerProxy } from "./runner-app-context";
3939

40+
process.once("beforeExit", (code) => {
41+
const filepath = `/tmp/runner-${process.pid.toString()}`;
42+
43+
writeFileSync(filepath, code.toString());
44+
45+
// eslint-disable-next-line no-console
46+
console.log("Runner exit");
47+
});
48+
4049
// async function flushStream(source: Readable | undefined, target: Writable) {
4150
// if (!source) return;
4251

@@ -161,12 +170,6 @@ export class Runner<X extends AppConfig> implements IComponent {
161170

162171
throw e;
163172
});
164-
165-
process.on("beforeExit", (code) => {
166-
const filepath = `/tmp/runner-${process.pid.toString()}`;
167-
168-
writeFileSync(filepath, code.toString());
169-
});
170173
}
171174

172175
get context(): RunnerAppContext<X, any> {

0 commit comments

Comments
 (0)