Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 17 additions & 49 deletions genkit-tools/cli/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@
import type { RuntimeManager } from '@genkit-ai/tools-common/manager';
import { startServer } from '@genkit-ai/tools-common/server';
import { findProjectRoot, logger } from '@genkit-ai/tools-common/utils';
import { spawn } from 'child_process';
import { Command } from 'commander';
import getPort, { makeRange } from 'get-port';
import open from 'open';
import { startManager } from '../utils/manager-utils';
import { startDevProcessManager, startManager } from '../utils/manager-utils';

interface RunOptions {
noui?: boolean;
Expand All @@ -44,10 +43,20 @@ export const start = new Command('start')
);
}
// Always start the manager.
let managerPromise: Promise<RuntimeManager> = startManager(
projectRoot,
true
);
let manager: RuntimeManager;
let processPromise: Promise<void> | undefined;
if (start.args.length > 0) {
const result = await startDevProcessManager(
projectRoot,
start.args[0],
start.args.slice(1)
);
manager = result.manager;
processPromise = result.processPromise;
} else {
manager = await startManager(projectRoot, true);
processPromise = new Promise(() => {});
}
if (!options.noui) {
let port: number;
if (options.port) {
Expand All @@ -59,51 +68,10 @@ export const start = new Command('start')
} else {
port = await getPort({ port: makeRange(4000, 4099) });
}
managerPromise = managerPromise.then((manager) => {
startServer(manager, port);
return manager;
});
startServer(manager, port);
if (options.open) {
open(`http://localhost:${port}`);
}
}
await managerPromise.then((manager: RuntimeManager) => {
const telemetryServerUrl = manager?.telemetryServerUrl;
return startRuntime(telemetryServerUrl);
});
await processPromise;
});

async function startRuntime(telemetryServerUrl?: string) {
if (start.args.length > 0) {
return new Promise((urlResolver, reject) => {
const appProcess = spawn(start.args[0], start.args.slice(1), {
env: {
...process.env,
GENKIT_TELEMETRY_SERVER: telemetryServerUrl,
GENKIT_ENV: 'dev',
},
shell: process.platform === 'win32',
});

const originalStdIn = process.stdin;
appProcess.stderr?.pipe(process.stderr);
appProcess.stdout?.pipe(process.stdout);
process.stdin?.pipe(appProcess.stdin);

appProcess.on('error', (error): void => {
logger.error(`Error in app process: ${error}`);
reject(error);
process.exitCode = 1;
});
appProcess.on('exit', (code) => {
process.stdin?.pipe(originalStdIn);
if (code === 0) {
urlResolver(undefined);
} else {
reject(new Error(`app process exited with code ${code}`));
}
});
});
}
return new Promise(() => {}); // no runtime, return a hanging promise.
}
21 changes: 21 additions & 0 deletions genkit-tools/cli/src/utils/manager-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
} from '@genkit-ai/telemetry-server';
import type { Status } from '@genkit-ai/tools-common';
import {
ProcessManager,
RuntimeManager,
type GenkitToolsError,
} from '@genkit-ai/tools-common/manager';
Expand Down Expand Up @@ -65,6 +66,26 @@ export async function startManager(
return manager;
}

export async function startDevProcessManager(
projectRoot: string,
command: string,
args: string[]
): Promise<{ manager: RuntimeManager; processPromise: Promise<void> }> {
const telemetryServerUrl = await resolveTelemetryServer(projectRoot);
const processManager = new ProcessManager(command, args, {
GENKIT_TELEMETRY_SERVER: telemetryServerUrl,
GENKIT_ENV: 'dev',
});
const manager = await RuntimeManager.create({
telemetryServerUrl,
manageHealth: true,
projectRoot,
processManager,
});
const processPromise = processManager.start();
return { manager, processPromise };
}

/**
* Runs the given function with a runtime manager.
*/
Expand Down
6 changes: 6 additions & 0 deletions genkit-tools/common/src/manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,10 @@
*/

export { RuntimeManager } from './manager';
export {
AppProcessStatus,
ProcessManager,
ProcessManagerStartOptions,
ProcessStatus,
} from './process-manager';
export * from './types';
14 changes: 11 additions & 3 deletions genkit-tools/common/src/manager/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import {
retriable,
type DevToolsInfo,
} from '../utils/utils';
import { ProcessManager } from './process-manager';
import {
GenkitToolsError,
RuntimeEvent,
Expand All @@ -57,9 +58,12 @@ interface RuntimeManagerOptions {
manageHealth?: boolean;
/** Project root dir. If not provided will be inferred from CWD. */
projectRoot: string;
/** An optional process manager for the main application process. */
processManager?: ProcessManager;
}

export class RuntimeManager {
readonly processManager?: ProcessManager;
private filenameToRuntimeMap: Record<string, RuntimeInfo> = {};
private filenameToDevUiMap: Record<string, DevToolsInfo> = {};
private idToFileMap: Record<string, string> = {};
Expand All @@ -68,8 +72,11 @@ export class RuntimeManager {
private constructor(
readonly telemetryServerUrl: string | undefined,
private manageHealth: boolean,
readonly projectRoot: string
) {}
readonly projectRoot: string,
processManager?: ProcessManager
) {
this.processManager = processManager;
}

/**
* Creates a new runtime manager.
Expand All @@ -78,7 +85,8 @@ export class RuntimeManager {
const manager = new RuntimeManager(
options.telemetryServerUrl,
options.manageHealth ?? true,
options.projectRoot
options.projectRoot,
options.processManager
);
await manager.setupRuntimesWatcher();
await manager.setupDevUiWatcher();
Expand Down
142 changes: 142 additions & 0 deletions genkit-tools/common/src/manager/process-manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/**
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { ChildProcess, spawn } from 'child_process';
import terminate from 'terminate';
import { logger } from '../utils';

export type ProcessStatus = 'running' | 'stopped' | 'unconfigured';

export interface AppProcessStatus {
status: ProcessStatus;
}

export interface ProcessManagerStartOptions {
nonInteractive?: boolean;
}

/**
* Manages a child process.
*/
export class ProcessManager {
private appProcess?: ChildProcess;
private originalStdIn?: NodeJS.ReadStream;
private _status: ProcessStatus = 'stopped';
private manualRestart = false;

constructor(
private readonly command: string,
private readonly args: string[],
private readonly env: NodeJS.ProcessEnv = {}
) {}

/**
* Starts the process.
*/
start(options?: ProcessManagerStartOptions): Promise<void> {
return new Promise((resolve, reject) => {
this._status = 'running';
this.appProcess = spawn(this.command, this.args, {
env: {
...process.env,
...this.env,
},
shell: process.platform === 'win32',
});

if (!options?.nonInteractive) {
this.originalStdIn = process.stdin;
this.appProcess.stderr?.pipe(process.stderr);
this.appProcess.stdout?.pipe(process.stdout);
process.stdin?.pipe(this.appProcess.stdin!);
}

this.appProcess.on('error', (error): void => {
logger.error(`Error in app process: ${error}`);
this.cleanup();
reject(error);
});

this.appProcess.on('exit', (code, signal) => {
this.cleanup();
if (this.manualRestart) {
this.manualRestart = false;
return;
}
// If the process was killed by a signal, it's not an error in this context.
if (code === 0 || signal) {
resolve();
} else {
reject(new Error(`app process exited with code ${code}`));
}
});
});
}

/**
* Kills the currently-running process and starts a new one.
*/
async restart(options?: ProcessManagerStartOptions): Promise<void> {
this.manualRestart = true;
await this.kill();
this.start(options).catch(() => {});
}

/**
* Kills the currently-running process.
*/
kill(): Promise<void> {
return new Promise((resolve) => {
if (!this.appProcess || !this.appProcess.pid || this.appProcess.killed) {
this._status = 'stopped';
resolve();
return;
}

// The 'exit' listener is set up in start() and will handle cleanup.
this.appProcess.on('exit', () => {
resolve();
});

terminate(this.appProcess.pid, 'SIGTERM', (err) => {
if (err) {
// This can happen if the process is already gone, which is fine.
logger.debug(`Error during process termination: ${err.message}`);
}
resolve();
});
});
}

status(): AppProcessStatus {
return {
status: this._status,
};
}

private cleanup() {
if (this.originalStdIn) {
process.stdin.unpipe(this.appProcess?.stdin!);
this.originalStdIn = undefined;
}
if (this.appProcess) {
this.appProcess.stdout?.unpipe(process.stdout);
this.appProcess.stderr?.unpipe(process.stderr);
}
this.appProcess = undefined;
this._status = 'stopped';
}
}
18 changes: 18 additions & 0 deletions genkit-tools/common/src/server/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
validateSchema,
} from '../eval';
import type { RuntimeManager } from '../manager/manager';
import { AppProcessStatus } from '../manager/process-manager';
import { GenkitToolsError, type RuntimeInfo } from '../manager/types';
import type { Action } from '../types/action';
import * as apis from '../types/apis';
Expand Down Expand Up @@ -301,6 +302,23 @@ export const TOOLS_SERVER_ROUTER = (manager: RuntimeManager) =>
getActiveRuntimes: t.procedure.query(() => {
return manager.listRuntimes();
}),

getAppProcessStatus: t.procedure.query((): AppProcessStatus => {
if (!manager.processManager) {
return { status: 'unconfigured' };
}
return manager.processManager.status();
}),

restartAppProcess: t.procedure.query(async () => {
await manager.processManager?.restart();
return true;
}),

killAppProcess: t.procedure.query(async () => {
await manager.processManager?.kill();
return true;
}),
});

export type ToolsServerRouter = ReturnType<typeof TOOLS_SERVER_ROUTER>;
Loading
Loading