Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion packages/adapters/src/process-instance-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { constants } from "fs";
import { access, readFile, rm } from "fs/promises";
import path from "path";
import { getRunnerEnvVariables } from "./get-runner-env";
import { development } from "@scramjet/sth-config";

const isTSNode = !!(process as any)[Symbol.for("ts-node.register.instance")];
const gotPython = "\n _ \n __ _____ _ __ ___ ___| |\n \\ \\ /\\ / / _ \\| '_ \\/ __|_ / |\n \\ V V / (_) | | | \\__ \\/ /|_|\n \\_/\\_/ \\___/|_| |_|___/___(_) 🐍\n";
Expand Down Expand Up @@ -152,6 +153,9 @@ class ProcessInstanceAdapter implements
config.sequenceDir,
config.entrypointPath
);

const extraEnvs = development() ? process.env : {};

const env = getRunnerEnvVariables({
sequencePath,
instancesServerHost: "127.0.0.1",
Expand All @@ -162,7 +166,8 @@ class ProcessInstanceAdapter implements
payload
}, {
PYTHONPATH: this.getPythonpath(config.sequenceDir),
...this.sthConfig.runnerEnvs
...this.sthConfig.runnerEnvs,
...extraEnvs
});

this.logger.debug("Spawning Runner process with command", runnerCommand);
Expand Down
6 changes: 3 additions & 3 deletions packages/host/src/lib/cpm-connector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import { ObjLogger } from "@scramjet/obj-logger";
import { ReasonPhrases } from "http-status-codes";
import { DuplexStream } from "@scramjet/api-server";
import { VerserClientConnection } from "@scramjet/verser/src/types";
import { EOL, networkInterfaces } from "os";
import { networkInterfaces } from "os";

type STHInformation = {
id?: string;
Expand Down Expand Up @@ -253,13 +253,13 @@ export class CPMConnector extends TypedEmitter<Events> {
};
}

this.logger.info(`${EOL}${EOL}\t\x1b[33m${this.config.id} connected to ${this.cpmId}\x1b[0m${EOL} `);
this.logger.info(`Hub ${this.config.id} connected to ${this.cpmId}`);

StringStream.from(duplex.input as Readable)
.JSONParse()
.map(async (message: EncodedControlMessage) => {
this.logger.trace("Received message", message);
const messageCode = message[0] as CPMMessageCode;
const messageCode = message[0] as unknown as CPMMessageCode;

if (messageCode === CPMMessageCode.STH_ID) {
// eslint-disable-next-line no-extra-parens
Expand Down
14 changes: 13 additions & 1 deletion packages/host/src/lib/csi-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import { DataStream } from "scramjet";
import { getInstanceAdapter } from "@scramjet/adapters";
import { ObjLogger } from "@scramjet/obj-logger";
import { RunnerConnectInfo } from "@scramjet/types/src/runner-connect";
import { cancellableDefer, CancellablePromise, defer, promiseTimeout, TypedEmitter } from "@scramjet/utility";
import { cancellableDefer, CancellablePromise, defer, isSetSequenceEndpointPayloadDTO, promiseTimeout, TypedEmitter } from "@scramjet/utility";
import { ReasonPhrases } from "http-status-codes";
import { mapRunnerExitCode } from "./utils";

Expand Down Expand Up @@ -734,6 +734,18 @@ export class CSIController extends TypedEmitter<Events> {

this.router.op("post", "/_stop", (req) => this.handleStop(req), this.communicationHandler);
this.router.op("post", "/_kill", (req) => this.handleKill(req), this.communicationHandler);

this.router.op("post", "/set", async (req) => {
const { body } = req;

if (isSetSequenceEndpointPayloadDTO(body)) {
this.logger.debug("Setting instance", body);

await this.communicationHandler.sendControlMessage(RunnerMessageCode.SET, body);
}

return { opStatus: ReasonPhrases.OK };
});
}

private async handleEvent(event: ParsedMessage): Promise<OpResponse<STHRestAPI.SendEventResponse>> {
Expand Down
7 changes: 4 additions & 3 deletions packages/host/src/lib/host.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ export class Host implements IComponent {
this.logger = new ObjLogger(
this,
{},
ObjLogger.levels.find((l: LogLevel) => l.toLowerCase() === sthConfig.logLevel) ||
ObjLogger.levels.find((l: LogLevel) => l.toLowerCase() === sthConfig.logLevel.toLowerCase()) ||
ObjLogger.levels[ObjLogger.levels.length - 1]
);

Expand Down Expand Up @@ -592,7 +592,8 @@ export class Host implements IComponent {
await this.csiDispatcher.startRunner(sequence, {
appConfig: seqenceConfig.appConfig || {},
args: seqenceConfig.args,
instanceId: seqenceConfig.instanceId
instanceId: seqenceConfig.instanceId,
logLevel: this.logger.logLevel
});

this.logger.debug("Starting sequence based on config", seqenceConfig);
Expand Down Expand Up @@ -1305,7 +1306,7 @@ export class Host implements IComponent {
await new Promise<void>((res, _rej) => {
this.socketServer.server
?.once("close", () => {
this.logger.trace("Socket server stopped.");
this.logger.info("Socket server stopped.");

res();
})
Expand Down
2 changes: 2 additions & 0 deletions packages/model/src/stream-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type ControlMessageHandlerList = {
[RunnerMessageCode.MONITORING_REPLY]: ConfiguredMessageHandler<RunnerMessageCode.MONITORING_REPLY>[];
[RunnerMessageCode.STOP]: ConfiguredMessageHandler<RunnerMessageCode.STOP>[];
[RunnerMessageCode.PONG]: ConfiguredMessageHandler<RunnerMessageCode.PONG>[];
[RunnerMessageCode.SET]: ConfiguredMessageHandler<RunnerMessageCode.SET>[];
[RunnerMessageCode.INPUT_CONTENT_TYPE]: ConfiguredMessageHandler<RunnerMessageCode.PONG>[];
[RunnerMessageCode.EVENT]: ConfiguredMessageHandler<RunnerMessageCode.EVENT>[];
[CPMMessageCode.STH_ID]: ConfiguredMessageHandler<CPMMessageCode.STH_ID>[];
Expand Down Expand Up @@ -92,6 +93,7 @@ export class CommunicationHandler implements ICommunicationHandler {
[RunnerMessageCode.STOP]: [],
[RunnerMessageCode.EVENT]: [],
[RunnerMessageCode.PONG]: [],
[RunnerMessageCode.SET]: [],
[RunnerMessageCode.INPUT_CONTENT_TYPE]: [],
[CPMMessageCode.STH_ID]: [],
[CPMMessageCode.KEY_REVOKED]: [],
Expand Down
45 changes: 40 additions & 5 deletions packages/obj-logger/src/obj-logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ type ObjLogPipeOptions = {
stringified?: boolean;
};

const noop = () => {};

const getCircularReplacer = () => {
const seen = new WeakSet();

Expand Down Expand Up @@ -65,7 +67,44 @@ export class ObjLogger implements IObjectLogger {
/**
* Log level.
*/
logLevel: LogLevel;
private _logLevel: LogLevel = "TRACE";

public get logLevel(): LogLevel {
return this._logLevel;
}
public set logLevel(value: LogLevel) {
this._logLevel = value;

this.trace = noop;
this.info = noop;
this.error = noop;
this.debug = noop;
this.fatal = noop;
this.warn = noop;

switch (value) {
case "TRACE":
this.trace = ObjLogger.prototype.trace;
// eslint-disable-next-line no-fallthrough
case "DEBUG":
this.debug = ObjLogger.prototype.debug;
// eslint-disable-next-line no-fallthrough
case "INFO":
this.info = ObjLogger.prototype.info;
// eslint-disable-next-line no-fallthrough
case "WARN":
this.warn = ObjLogger.prototype.warn;
// eslint-disable-next-line no-fallthrough
case "ERROR":
this.error = ObjLogger.prototype.error;
// eslint-disable-next-line no-fallthrough
case "FATAL":
this.fatal = ObjLogger.prototype.fatal;
// eslint-disable-next-line no-fallthrough
default:
break;
}
}

/**
* Additional output streams.
Expand Down Expand Up @@ -115,10 +154,6 @@ export class ObjLogger implements IObjectLogger {
if (this.ended)
throw new Error("Cannot write to the stream anymore.");

if (ObjLogger.levels.indexOf(level) > ObjLogger.levels.indexOf(this.logLevel)) {
return;
}

let paramsCopy;

if (optionalParams.length) {
Expand Down
1 change: 1 addition & 0 deletions packages/python-runner/hardcoded_magic_values.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,6 @@ class RunnerMessageCodes(Enum):
KILL = 4002
MONITORING_RATE = 4003
MONITORING_REPLY = 4004
SET = 4005

EVENT = 5001
24 changes: 17 additions & 7 deletions packages/python-runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ async def main(self, server_host, server_port):
self.connect_stdio()
self.connect_log_stream()

config, args = await self.handshake()
config, args, log_level = await self.handshake()
self.logger.info('Communication established.')
asyncio.create_task(self.connect_control_stream())
asyncio.create_task(self.setup_heartbeat())

self.load_sequence()
await self.run_instance(config, args)
await self.run_instance(config, args, log_level)


async def init_connections(self, host, port):
Expand Down Expand Up @@ -130,6 +130,8 @@ async def handshake(self):
data['appConfig'] = {}
if 'args' not in data:
data['args'] = []
if 'logLevel' not in data:
data['logLevel'] = 'DEBUG'

self.logger.info(f'Sending PANG')
pang_requires_data = {
Expand All @@ -140,8 +142,7 @@ async def handshake(self):

if code == msg_codes.PONG.value:
self.logger.info(f'Got configuration: {data}')
return data['appConfig'], data['args']

return data['appConfig'], data['args'], data['logLevel']

async def connect_control_stream(self):
# Control stream carries ndjson, so it's enough to split into lines.
Expand All @@ -153,6 +154,8 @@ async def connect_control_stream(self):
)
async for code, data in control_messages:
self.logger.debug(f'Control message received: {code} {data}')
if code == msg_codes.SET.value:
self.handle_set(data)
if code == msg_codes.KILL.value:
self.exit_immediately()
if code == msg_codes.STOP.value:
Expand All @@ -161,6 +164,11 @@ async def connect_control_stream(self):
self.emitter.emit(data['eventName'], data['message'] if 'message' in data else None)


async def handle_set(self, data):
self.logger.info(f'Setting logLevel: {data}')
if 'logLevel' in data:
self.logger.setLevel(data['logLevel'])

async def handle_stop(self, data):
self.logger.info(f'Gracefully shutting down...{data}')
self.keep_alive_requested = False
Expand Down Expand Up @@ -203,12 +211,14 @@ def load_sequence(self):
# switch to sequence dir so that relative paths will work
os.chdir(os.path.dirname(self.seq_path))

async def run_instance(self, config, args):
async def run_instance(self, config, args, log_level):
context = AppContext(self, config)
input_stream = Stream()

asyncio.create_task(self.connect_input_stream(input_stream))

self.logger.info('Running instance...')
self.logger.setLevel(log_level)
result = self.sequence.run(context, input_stream, *args)

self.logger.info(f'Sending PANG')
Expand All @@ -223,7 +233,7 @@ async def run_instance(self, config, args):
produces = {}
produces['provides'] = produces_runtime
produces['contentType'] = getattr(result, 'content_type', None)
produces_json = json.dumps(produces)
produces_json = json.dumps(produces)

if produces:
self.logger.info(f'Sending PANG with {produces}')
Expand All @@ -238,7 +248,7 @@ async def run_instance(self, config, args):
consumes = {}
consumes['requires'] = consumes_runtime
consumes['contentType'] = getattr(result, 'content_type', None)
consumes_json = json.dumps(consumes)
consumes_json = json.dumps(consumes)

if consumes:
self.logger.info(f'Sending PANG with {consumes}')
Expand Down
7 changes: 4 additions & 3 deletions packages/runner/src/runner-app-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { ObjLogger } from "@scramjet/obj-logger";
import {
EventMessageData, KeepAliveMessageData, MonitoringMessageFromRunnerData,
AppConfig, AppError, AppErrorConstructor, AppContext, WritableStream,
FunctionDefinition, KillHandler, StopHandler, MonitoringHandler, IObjectLogger, HostClient, ManagerClient
FunctionDefinition, KillHandler, StopHandler, MonitoringHandler, IObjectLogger, HostClient, ManagerClient, LogLevel
} from "@scramjet/types";
import { EventEmitter } from "events";

Expand Down Expand Up @@ -31,20 +31,21 @@ implements AppContext<AppConfigType, State> {
emitter: EventEmitter;
initialState?: State;
exitTimeout: number = 10000;
logger: IObjectLogger = new ObjLogger("Sequence");
logger: IObjectLogger;
hub: HostClient;
space: ManagerClient;
instanceId: string;

constructor(config: AppConfigType, monitorStream: WritableStream<any>,
emitter: EventEmitter, runner: RunnerProxy, hostClient: HostClient, spaceClient: ManagerClient, id: string) {
emitter: EventEmitter, runner: RunnerProxy, hostClient: HostClient, spaceClient: ManagerClient, id: string, logLevel: LogLevel) {
this.config = config;
this.monitorStream = monitorStream;
this.emitter = emitter;
this.runner = runner;
this.hub = hostClient;
this.space = spaceClient;
this.instanceId = id;
this.logger = new ObjLogger("Sequence", {}, logLevel);
}

private handleSave(_state: any): void {
Expand Down
Loading
Loading