Skip to content

Commit e233b91

Browse files
committed
Fail. runner not exits
1 parent a3e50ec commit e233b91

File tree

6 files changed

+69
-57
lines changed

6 files changed

+69
-57
lines changed

.vscode/launch.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@
3131
"outFiles": [
3232
"${workspaceFolder}/**/*.js"
3333
]
34+
},
35+
{
36+
"name": "Attach to Process",
37+
"type": "node",
38+
"request": "attach",
39+
"processId": "${command:PickProcess}"
3440
}
3541
]
3642
}

packages/adapters/src/process-instance-adapter.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ class ProcessInstanceAdapter implements
105105
...debugFlags,
106106
path.resolve(__dirname,
107107
process.env.ESBUILD
108-
? "../../runner/bin/start-runner.js"
108+
? !isTSNode ? "../../runner/bin/start-runner.js" : "../../runner/src/bin/start-runner.ts"
109109
: require.resolve("@scramjet/runner")
110110
)
111111
];
@@ -172,7 +172,8 @@ class ProcessInstanceAdapter implements
172172
this.crashLogStreams = Promise.all([runnerProcess.stdout, runnerProcess.stderr].map(streamToString));
173173

174174
this.logger.trace("Runner process is running", runnerProcess.pid);
175-
175+
this.runnerProcess?.stderr?.pipe(process.stdout);
176+
this.runnerProcess?.stdout?.pipe(process.stdout);
176177
this.runnerProcess = runnerProcess;
177178
}
178179

packages/host/src/lib/csi-controller.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ export class CSIController extends TypedEmitter<Events> {
226226

227227
async main() {
228228
this.status = InstanceStatus.RUNNING;
229-
this.logger.trace("Instance started");
229+
this.logger.trace("Instance status: RUNNING");
230230

231231
let code = -1;
232232

@@ -545,7 +545,7 @@ export class CSIController extends TypedEmitter<Events> {
545545
}
546546

547547
this.info.started = new Date();
548-
this.logger.info("Instance started", JSON.stringify(message, undefined));
548+
this.logger.info("Instance started", JSON.stringify(message));
549549
}
550550

551551
async handleInstanceConnect(streams: DownstreamStreamsConfig) {

packages/runner/src/host-client.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
/* eslint-disable no-console */
12
/* eslint-disable dot-notation */
23
import { ObjLogger } from "@scramjet/obj-logger";
34
import { CommunicationChannel as CC } from "@scramjet/symbols";
@@ -48,7 +49,7 @@ class HostClient implements IHostClient {
4849

4950
try {
5051
connection = net.createConnection(this.instancesServerPort, this.instancesServerHost);
51-
connection.on("error", () => {});
52+
connection.on("error", (error) => { this.logger.error("Connection error", error); });
5253
connection.setNoDelay(true);
5354
} catch (e) {
5455
return Promise.reject(e);
@@ -118,17 +119,21 @@ class HostClient implements IHostClient {
118119
const streamsExitedPromised: Promise<void>[] = this.streams.map((stream, i) =>
119120
new Promise(
120121
(res) => {
122+
console.log("Disconnecting", i);
121123
if ("writable" in stream!) {
122124
stream
123125
.on("error", (e) => {
124126
console.error("Error on stream", i, e.stack);
127+
res();
125128
})
126129
.on("close", () => {
130+
console.log("Closed", i);
127131
res();
128132
})
129133
.end();
130134
} else {
131135
stream!.destroy();
136+
console.log("destroyed", i);
132137
res();
133138
}
134139
}

packages/runner/src/runner-app-context.ts

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,20 @@
11

22
import { ObjLogger } from "@scramjet/obj-logger";
33
import {
4-
EventMessageData, KeepAliveMessageData, MonitoringMessageFromRunnerData,
5-
AppConfig, AppError, AppErrorConstructor, AppContext, WritableStream,
6-
FunctionDefinition, KillHandler, StopHandler, MonitoringHandler, IObjectLogger, HostClient, ManagerClient
4+
AppConfig,
5+
AppContext,
6+
AppError, AppErrorConstructor,
7+
EventMessageData,
8+
FunctionDefinition,
9+
HostClient,
10+
IObjectLogger,
11+
KeepAliveMessageData,
12+
KillHandler,
13+
ManagerClient,
14+
MonitoringHandler,
15+
MonitoringMessageFromRunnerData,
16+
StopHandler,
17+
WritableStream
718
} from "@scramjet/types";
819
import { EventEmitter } from "events";
920

@@ -36,14 +47,14 @@ implements AppContext<AppConfigType, State> {
3647
space: ManagerClient;
3748
instanceId: string;
3849

39-
constructor(config: AppConfigType, monitorStream: WritableStream<any>,
40-
emitter: EventEmitter, runner: RunnerProxy, hostClient: HostClient, spaceClient: ManagerClient, id: string) {
50+
constructor(id: string, config: AppConfigType, monitorStream: WritableStream<any>,
51+
emitter: EventEmitter, runner: RunnerProxy, hostClient: HostClient, spaceClient?: ManagerClient) {
4152
this.config = config;
4253
this.monitorStream = monitorStream;
4354
this.emitter = emitter;
4455
this.runner = runner;
4556
this.hub = hostClient;
46-
this.space = spaceClient;
57+
this.space = spaceClient!;
4758
this.instanceId = id;
4859
}
4960

packages/runner/src/runner.ts

Lines changed: 35 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -30,22 +30,12 @@ import { Readable, Writable } from "stream";
3030

3131
import { HostClient as HostApiClient } from "@scramjet/api-client";
3232
import { ClientUtilsCustomAgent } from "@scramjet/client-utils";
33-
import { ManagerClient } from "@scramjet/manager-api-client";
3433
import { RunnerConnectInfo } from "@scramjet/types/src/runner-connect";
3534
import { writeFileSync } from "fs";
3635
import { mapToInputDataStream, readInputStreamHeaders } from "./input-stream";
3736
import { MessageUtils } from "./message-utils";
3837
import { RunnerAppContext, RunnerProxy } from "./runner-app-context";
3938

40-
process.once("beforeExit", (code) => {
41-
const filepath = `/tmp/runner-${process.pid.toString()}`;
42-
43-
writeFileSync(filepath, code.toString());
44-
45-
// eslint-disable-next-line no-console
46-
console.log("Runner exit");
47-
});
48-
4939
// async function flushStream(source: Readable | undefined, target: Writable) {
5040
// if (!source) return;
5141

@@ -74,26 +64,10 @@ export function isSynchronousStreamable(obj: SynchronousStreamable<any> | Primit
7464

7565
const overrideMap: Map<Writable, OverrideConfig> = new Map();
7666

77-
function revertStandardStream(oldStream: Writable) {
78-
if (overrideMap.has(oldStream)) {
79-
const { write, drainCb, errorCb } = overrideMap.get(oldStream) as OverrideConfig;
80-
81-
// @ts-ignore - this is ok, we're doing this on purpose!
82-
delete oldStream.write;
83-
84-
// if prototypic write is there, then no change needed
85-
if (oldStream.write !== write)
86-
oldStream.write = write;
87-
88-
oldStream.off("drain", drainCb);
89-
oldStream.off("error", errorCb);
90-
overrideMap.delete(oldStream);
91-
}
92-
}
93-
9467
function overrideStandardStream(oldStream: Writable, newStream: Writable) {
9568
if (overrideMap.has(oldStream)) {
9669
//throw new Error("Attempt to override stream more than once");
70+
// eslint-disable-next-line no-use-before-define
9771
revertStandardStream(oldStream);
9872
}
9973

@@ -115,6 +89,23 @@ function overrideStandardStream(oldStream: Writable, newStream: Writable) {
11589
overrideMap.set(oldStream, { write, drainCb, errorCb });
11690
}
11791

92+
function revertStandardStream(oldStream: Writable) {
93+
if (overrideMap.has(oldStream)) {
94+
const { write, drainCb, errorCb } = overrideMap.get(oldStream) as OverrideConfig;
95+
96+
// @ts-ignore - this is ok, we're doing this on purpose!
97+
delete oldStream.write;
98+
99+
// if prototypic write is there, then no change needed
100+
if (oldStream.write !== write)
101+
oldStream.write = write;
102+
103+
oldStream.off("drain", drainCb);
104+
oldStream.off("error", errorCb);
105+
overrideMap.delete(oldStream);
106+
}
107+
}
108+
118109
/**
119110
* Runtime environment for sequence code.
120111
* Communicates with Host with data transferred to/from Sequence, health info,
@@ -165,11 +156,17 @@ export class Runner<X extends AppConfig> implements IComponent {
165156
throw e;
166157
});
167158

168-
this.outputDataStream = new DataStream({ highWaterMark: 0 }).catch((e: any) => {
159+
this.outputDataStream = new DataStream().catch((e: any) => {
169160
this.logger.error("Error during input data stream", e);
170161

171162
throw e;
172163
});
164+
165+
process.on("beforeExit", (code) => {
166+
const filepath = `/tmp/runner-${process.pid.toString()}`;
167+
168+
writeFileSync(filepath, code.toString());
169+
});
173170
}
174171

175172
get context(): RunnerAppContext<X, any> {
@@ -261,17 +258,17 @@ export class Runner<X extends AppConfig> implements IComponent {
261258
[RunnerMessageCode.MONITORING, { healthy }], this.hostClient.monitorStream
262259
);
263260

264-
this.monitoringMessageReplyTimeout = setTimeout(async () => {
265-
await this.handleDisconnect();
261+
this.monitoringMessageReplyTimeout = setTimeout(() => {
262+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
263+
this.handleDisconnect();
266264
}, 500);
267265
}
268266

269267
async handleDisconnect() {
270268
this.logger.info("Reinitializing....");
271269

272-
this.premain().catch((e) => {
273-
this.logger.error("Premain error", e);
274-
});
270+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
271+
this.premain();
275272
}
276273

277274
async handleKillRequest(): Promise<void> {
@@ -330,13 +327,9 @@ export class Runner<X extends AppConfig> implements IComponent {
330327
try {
331328
await this.hostClient.init(this.instanceId);
332329
} catch (e) {
333-
this.logger.error("hostClient init error", e);
334-
335330
await defer(2000);
336-
337-
this.premain().catch((err: any) => {
338-
this.logger.error("Premain error", err);
339-
});
331+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
332+
this.premain();
340333
}
341334

342335
this.redirectOutputs();
@@ -480,9 +473,6 @@ export class Runner<X extends AppConfig> implements IComponent {
480473
const hostClientUtils = new ClientUtilsCustomAgent("http://scramjet-host/api/v1", this.hostClient.getAgent());
481474
const hostApiClient = new HostApiClient("http://scramjet-host/api/v1", hostClientUtils);
482475

483-
const managerClientUtils = new ClientUtilsCustomAgent("http://scramjet-host/api/v1/cpm/api/v1", this.hostClient.getAgent());
484-
const managerApiClient = new ManagerClient("http://scramjet-host/api/v1/cpm/api/v1", managerClientUtils);
485-
486476
const runner: RunnerProxy = {
487477
keepAliveIssued: () => this.keepAliveIssued(),
488478
sendStop: (err?: Error) => {
@@ -493,13 +483,12 @@ export class Runner<X extends AppConfig> implements IComponent {
493483
};
494484

495485
this._context = new RunnerAppContext(
486+
this.instanceId,
496487
config,
497488
this.hostClient.monitorStream,
498489
this.emitter,
499490
runner,
500-
hostApiClient as HostClient,
501-
managerApiClient as ManagerClient,
502-
this.instanceId
491+
hostApiClient as HostClient
503492
);
504493
this._context.logger.pipe(this.logger);
505494

@@ -612,7 +601,7 @@ export class Runner<X extends AppConfig> implements IComponent {
612601
if (intermediate instanceof Readable) {
613602
stream = intermediate;
614603
} else if (intermediate !== undefined && isSynchronousStreamable(intermediate)) {
615-
stream = Object.assign(DataStream.from(intermediate as Readable, { highWaterMark: 0 }), {
604+
stream = Object.assign(DataStream.from(intermediate as Readable), {
616605
topic: intermediate.topic,
617606
contentType: intermediate.contentType
618607
});

0 commit comments

Comments
 (0)