Skip to content

Commit 156ab2b

Browse files
committed
Restore piping instance
1 parent 12a087e commit 156ab2b

File tree

4 files changed

+75
-56
lines changed

4 files changed

+75
-56
lines changed

packages/host/src/lib/csi-controller.ts

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ type Events = {
6363

6464
const BPMux = require("bpmux").BPMux;
6565

66+
export type CSIControllerInfo = { ports?: any; created?: Date; started?: Date; ended?: Date; };
6667
/**
6768
* Handles all Instance lifecycle, exposes instance's HTTP API.
6869
*
@@ -95,7 +96,7 @@ export class CSIController extends TypedEmitter<Events> {
9596
args: Array<any> | undefined;
9697
controlDataStream?: DataStream;
9798
router?: APIRoute;
98-
info: { ports?: any; created?: Date; started?: Date; ended?: Date; } = {};
99+
info: CSIControllerInfo = {};
99100
status: InstanceStatus;
100101
terminated?: { exitcode: number; reason: string; };
101102
provides?: string;
@@ -111,6 +112,8 @@ export class CSIController extends TypedEmitter<Events> {
111112
apiOutput = new PassThrough();
112113
apiInputEnabled = true;
113114

115+
executionTime: number = -1;
116+
114117
/**
115118
* Topic to which the output stream should be routed
116119
*/
@@ -186,11 +189,6 @@ export class CSIController extends TypedEmitter<Events> {
186189

187190
this.logger = new ObjLogger(this, { id: this.id });
188191

189-
this.logger.debug("Constructor executed", arguments);
190-
191-
// eslint-disable-next-line no-console
192-
console.log("Constructor executed", arguments);
193-
194192
this.status = InstanceStatus.INITIALIZING;
195193

196194
this.upStreams = [
@@ -205,8 +203,8 @@ export class CSIController extends TypedEmitter<Events> {
205203
];
206204
}
207205

208-
async start() {
209-
const i = new Promise((res, rej) => {
206+
async start(): Promise<void> {
207+
const i = new Promise<void>((res, rej) => {
210208
this.initResolver = { res, rej };
211209
this.startInstance();
212210
});

packages/host/src/lib/csi-dispatcher.ts

Lines changed: 63 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,16 @@ import { RunnerMessageCode } from "@scramjet/symbols";
55
import { HostProxy, ICommunicationHandler, IObjectLogger, InstanceConfig, MessageDataType, STHConfiguration, STHRestAPI, SequenceInfo } from "@scramjet/types";
66
import { StartSequencePayload } from "@scramjet/types/src/rest-api-sth";
77
import { TypedEmitter } from "@scramjet/utility";
8-
import { CSIController } from "./csi-controller";
8+
import { CSIController, CSIControllerInfo } from "./csi-controller";
99
import { InstanceStore } from "./instance-store";
10-
import SequenceStore from "./sequenceStore";
11-
import { SocketServer } from "./socket-server";
10+
import { ServiceDiscovery } from "./serviceDiscovery/sd-adapter";
11+
import { ContentType } from "./serviceDiscovery/contentType";
12+
import TopicId from "./serviceDiscovery/topicId";
13+
import { Readable, Writable } from "stream";
1214

1315
type errorEventData = {id:string, err: any }
14-
type endEventData = {id:string, code:number }
16+
type endEventData = { id: string, code: number, info: CSIControllerInfo & { executionTime: number }, sequence: SequenceInfo};
17+
1518
type Events = {
1619
pang: (payload: MessageDataType<RunnerMessageCode.PANG>) => void;
1720
hourChime: () => void;
@@ -24,16 +27,16 @@ type Events = {
2427

2528
export class CSIDispatcher extends TypedEmitter<Events> {
2629
public logger: IObjectLogger;
27-
//private socketServer: SocketServer;
2830
public instancesStore: typeof InstanceStore;
2931
private STHConfig: STHConfiguration;
32+
private serviceDiscovery: ServiceDiscovery;
3033

31-
constructor(_socketServer: SocketServer, instancesStore: typeof InstanceStore, _sequenceStore: SequenceStore, STHConfig: STHConfiguration) {
34+
constructor(instancesStore: typeof InstanceStore, serviceDiscovery: ServiceDiscovery, STHConfig: STHConfiguration) {
3235
super();
3336
this.logger = new ObjLogger(this);
34-
//this.socketServer = socketServer;
3537
this.instancesStore = instancesStore;
3638
this.STHConfig = STHConfig;
39+
this.serviceDiscovery = serviceDiscovery;
3740
}
3841

3942
async createCSIController(
@@ -53,7 +56,6 @@ export class CSIDispatcher extends TypedEmitter<Events> {
5356
communicationHandler.logger.pipe(this.logger, { end: false });
5457

5558
csiController.on("error", (err) => {
56-
//this.pushTelemetry("Instance error", { ...err }, "error");
5759
this.logger.error("CSIController errored", err.message, err.exitcode);
5860
this.emit("error", { id, err });
5961
});
@@ -67,47 +69,46 @@ export class CSIDispatcher extends TypedEmitter<Events> {
6769
}
6870

6971
if (data.requires && !csiController.inputRouted && data.contentType) {
70-
this.logger.trace("Routing Sequence input to topic", data.requires);
72+
this.logger.trace("Routing Sequence topic to input", data.requires);
7173

72-
// await this.serviceDiscovery.routeTopicToStream(
73-
// { topic: data.requires, contentType: data.contentType! },
74-
// csiController.getInputStream()
75-
// );
74+
await this.serviceDiscovery.routeTopicToStream(
75+
{ topic: new TopicId(data.requires), contentType: data.contentType as ContentType },
76+
csiController.getInputStream()
77+
);
7678

7779
csiController.inputRouted = true;
7880

79-
// await this.serviceDiscovery.update({
80-
// requires: data.requires, contentType: data.contentType!, topicName: data.requires
81-
// });
81+
await this.serviceDiscovery.update({
82+
requires: data.requires, contentType: data.contentType, topicName: data.requires
83+
});
8284
}
8385

8486
if (data.provides && !csiController.outputRouted && data.contentType) {
8587
this.logger.trace("Routing Sequence output to topic", data.provides);
86-
// await this.serviceDiscovery.routeStreamToTopic(
87-
// csiController.getOutputStream(),
88-
// { topic: data.provides, contentType: data.contentType! },
89-
// csiController.id
90-
// );
88+
await this.serviceDiscovery.routeStreamToTopic(
89+
csiController.getOutputStream(),
90+
{ topic: new TopicId(data.provides), contentType: data.contentType as ContentType }
91+
);
9192

9293
csiController.outputRouted = true;
9394

94-
// await this.serviceDiscovery.update({
95-
// provides: data.provides, contentType: data.contentType!, topicName: data.provides
96-
// });
95+
await this.serviceDiscovery.update({
96+
provides: data.provides, contentType: data.contentType!, topicName: data.provides
97+
});
9798
}
9899
});
99100

100101
csiController.on("end", async (code) => {
101-
this.logger.trace("csiControllerontrolled ended", `Exit code: ${code}`);
102-
103-
// if (csiController.provides && csiController.provides !== "") {
104-
// csiController.getOutputStream()!.unpipe(this.serviceDiscovery.getData(
105-
// {
106-
// topic: csiController.provides,
107-
// contentType: ""
108-
// }
109-
// ) as Writable);
110-
// }
102+
this.logger.trace("csiControllerontrolled ended", `id: ${csiController.id}`, `Exit code: ${code}`);
103+
104+
if (csiController.provides && csiController.provides !== "") {
105+
csiController.getOutputStream().unpipe(this.serviceDiscovery.getData(
106+
{
107+
topic: new TopicId(csiController.provides),
108+
contentType: "" as ContentType
109+
}
110+
) as Writable);
111+
}
111112

112113
csiController.logger.unpipe(this.logger);
113114

@@ -123,17 +124,24 @@ export class CSIDispatcher extends TypedEmitter<Events> {
123124
// }, InstanceMessageCode.INSTANCE_ENDED);
124125

125126
// this.auditor.auditInstance(id, InstanceMessageCode.INSTANCE_ENDED);
126-
this.emit("end", { id, code });
127+
this.emit("end", {
128+
id,
129+
code,
130+
info: {
131+
executionTime: csiController.executionTime
132+
},
133+
sequence: csiController.sequence
134+
});
127135
});
128136

129137
csiController.once("terminated", (code) => {
130-
// if (csiController.requires && csiController.requires !== "") {
131-
// (this.serviceDiscovery.getData({
132-
// topic: csiController.requires,
133-
// contentType: "",
134-
// }) as Readable
135-
// ).unpipe(csiController.getInputStream()!);
136-
// }
138+
if (csiController.requires && csiController.requires !== "") {
139+
(this.serviceDiscovery.getData({
140+
topic: new TopicId(csiController.requires),
141+
contentType: "" as ContentType,
142+
}) as Readable
143+
).unpipe(csiController.getInputStream()!);
144+
}
137145

138146
// this.auditor.auditInstance(id, InstanceMessageCode.INSTANCE_ENDED);
139147
// this.pushTelemetry("Instance ended", {
@@ -144,11 +152,20 @@ export class CSIDispatcher extends TypedEmitter<Events> {
144152
// code: code.toString(),
145153
// seqId: csiController.sequence.id
146154
// });
147-
this.emit("terminated", { id, code });
155+
156+
this.emit("terminated", {
157+
id,
158+
code,
159+
info: {
160+
executionTime: csiController.executionTime
161+
},
162+
sequence: csiController.sequence
163+
});
148164
});
149165

150-
csiController.start().catch(() => {
151-
//@TODO: handle start error;
166+
csiController.start().catch((e) => {
167+
this.logger.error("CSIC start error", csiController.id, e);
168+
throw new Error("CSIC start error");
152169
});
153170

154171
this.logger.trace("csiController started", id);
@@ -189,8 +206,8 @@ export class CSIDispatcher extends TypedEmitter<Events> {
189206
await new Promise<void>((resolve, _reject) => {
190207
const resolveFunction = (eventId: string) => {
191208
if (eventId === id) {
192-
resolve();
193209
this.off("established", resolveFunction);
210+
resolve();
194211
}
195212
};
196213

packages/host/src/lib/host.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ export class Host implements IComponent {
239239
this.instanceBase = `${this.config.host.apiBase}/instance`;
240240
this.topicsBase = `${this.config.host.apiBase}/topic`;
241241

242-
this.csiDispatcher = new CSIDispatcher(this.socketServer, this.instancesStore, this.sequenceStore, sthConfig);
242+
this.csiDispatcher = new CSIDispatcher(this.instancesStore, this.serviceDiscovery, sthConfig);
243243

244244
this.csiDispatcher.logger.pipe(this.logger);
245245

@@ -273,7 +273,9 @@ export class Host implements IComponent {
273273
}
274274

275275
attachDispatcherEvents() {
276-
//this.csiDispatcher.on();
276+
this.csiDispatcher.on("error", (errorData) => {
277+
this.pushTelemetry("Instance error", { ...errorData }, "error");
278+
});
277279
}
278280

279281
getId() {

packages/host/src/lib/serviceDiscovery/sd-adapter.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ export class ServiceDiscovery {
7676
const topic = this.topicsController.get(topicName);
7777

7878
if (topic) {
79+
config.contentType ||= topic.contentType;
80+
7981
if (topic.contentType !== config.contentType) {
8082
this.logger.error("Content-type mismatch, existing and requested ", topic.contentType, config.contentType);
8183
throw new Error("Content-type mismatch");

0 commit comments

Comments
 (0)