diff --git a/genkit-tools/cli/src/utils/manager-utils.ts b/genkit-tools/cli/src/utils/manager-utils.ts index 36d2afa5b7..f324bf2cf9 100644 --- a/genkit-tools/cli/src/utils/manager-utils.ts +++ b/genkit-tools/cli/src/utils/manager-utils.ts @@ -15,6 +15,7 @@ */ import { + LocalFileLogStore, LocalFileTraceStore, startTelemetryServer, } from '@genkit-ai/telemetry-server'; @@ -48,6 +49,10 @@ export async function resolveTelemetryServer(options: { storeRoot: options.projectRoot, indexRoot: options.projectRoot, }), + logStore: new LocalFileLogStore({ + storeRoot: options.projectRoot, + indexRoot: options.projectRoot, + }), corsOrigin: options.corsOrigin, }); } diff --git a/genkit-tools/common/src/types/index.ts b/genkit-tools/common/src/types/index.ts index acc8b6a11b..8243fe2376 100644 --- a/genkit-tools/common/src/types/index.ts +++ b/genkit-tools/common/src/types/index.ts @@ -23,6 +23,7 @@ export * from './document'; export * from './env'; export * from './eval'; export * from './evaluator'; +export * from './log'; export * from './model'; export * from './prompt'; export * from './reflection'; diff --git a/genkit-tools/common/src/types/log.ts b/genkit-tools/common/src/types/log.ts new file mode 100644 index 0000000000..d5242bcc52 --- /dev/null +++ b/genkit-tools/common/src/types/log.ts @@ -0,0 +1,66 @@ +/** + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { extendZodWithOpenApi } from '@asteasolutions/zod-to-openapi'; +import * as z from 'zod'; + +import { InstrumentationLibrarySchema } from './trace'; + +extendZodWithOpenApi(z); + +/** + * Zod schema for log record data. + */ +export const LogRecordSchema = z + .object({ + logId: z.string(), // Server-generated if omitted + traceId: z.string().optional(), + spanId: z.string().optional(), + timestamp: z + .number() + .describe('log recorded time in milliseconds since the epoch'), + severityNumber: z.number().optional(), + severityText: z.string().optional(), + body: z.unknown().optional(), // Represents any value: string, number, boolean, array, or object + attributes: z.record(z.string(), z.unknown()).optional(), + instrumentationLibrary: InstrumentationLibrarySchema.optional(), + }) + .openapi('LogRecordData'); + +export type LogRecordData = z.infer; + +/** + * Log query parameters. + */ +export const LogQuerySchema = z.object({ + limit: z.coerce.number().optional(), + continuationToken: z.string().optional(), + traceId: z.string().optional(), + spanId: z.string().optional(), +}); + +export type LogQuery = z.infer; + +export interface LogQueryResponse { + logs: LogRecordData[]; + continuationToken?: string; +} + +export interface LogStore { + init(): Promise; + save(logs: LogRecordData[]): Promise; + list(query?: LogQuery): Promise; +} diff --git a/genkit-tools/telemetry-server/src/file-log-store.ts b/genkit-tools/telemetry-server/src/file-log-store.ts new file mode 100644 index 0000000000..2f1564b035 --- /dev/null +++ b/genkit-tools/telemetry-server/src/file-log-store.ts @@ -0,0 +1,263 @@ +/** + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { + LogRecordData, + type LogQuery, + type LogQueryResponse, + type LogStore, +} from '@genkit-ai/tools-common'; +import { logger } from '@genkit-ai/tools-common/utils'; +import { Mutex } from 'async-mutex'; +import fs from 'fs'; +import lockfile from 'lockfile'; +import path from 'path'; + +function lockFile(file: string) { + return `${file}.lock`; +} + +export class LocalFileLogStore implements LogStore { + private readonly storeRoot: string; + private readonly indexRoot: string; + private readonly index: LogIndex; + private mutex = new Mutex(); + + constructor(options: { storeRoot: string; indexRoot: string }) { + this.storeRoot = path.resolve(options.storeRoot, '.genkit/logs'); + fs.mkdirSync(this.storeRoot, { recursive: true }); + this.indexRoot = path.resolve(options.indexRoot, '.genkit/logs_idx'); + fs.mkdirSync(this.indexRoot, { recursive: true }); + logger.debug( + `[Telemetry Server] initialized local file log store at root: ${this.storeRoot}` + ); + this.index = new LogIndex(this.indexRoot); + } + + async init(): Promise { + // Indexes and store are append-only. + } + + private getCurrentLogFile(): string { + const now = new Date(); + // format YYYY-MM-DD-HH + const pad = (n: number) => n.toString().padStart(2, '0'); + const dateStr = `${now.getFullYear()}-${pad(now.getMonth() + 1)}-${pad(now.getDate())}-${pad(now.getHours())}`; + return path.resolve(this.storeRoot, `logs-${dateStr}.jsonl`); + } + + async save(logs: LogRecordData[]): Promise { + if (logs.length === 0) return; + + await this.mutex.runExclusive(() => { + const logFile = this.getCurrentLogFile(); + const relativeFileName = path.basename(logFile); + + let currentOffset = 0; + if (fs.existsSync(logFile)) { + currentOffset = fs.statSync(logFile).size; + } + + const indexEntries: LogIndexEntry[] = []; + let offsetTracker = currentOffset; + + const linesToAppend = logs.map((log) => { + // Ensure log id exists if not provided + if (!log.logId && log.traceId && log.spanId) { + log.logId = `${log.traceId}-${log.spanId}-${log.timestamp}-${Math.random().toString(36).substring(7)}`; + } else if (!log.logId) { + log.logId = `${log.timestamp}-${Math.random().toString(36).substring(7)}`; + } + const line = JSON.stringify(log) + '\n'; + const length = Buffer.byteLength(line, 'utf8'); + + indexEntries.push({ + traceId: log.traceId, + spanId: log.spanId, + timestamp: log.timestamp, + severityText: log.severityText, + severityNumber: log.severityNumber, + file: relativeFileName, + offset: offsetTracker, + length: length, + }); + + offsetTracker += length; + return line; + }); + + fs.appendFileSync(logFile, linesToAppend.join('')); + this.index.add(indexEntries); + }); + } + + async list(query?: LogQuery): Promise { + const startFromIndex = query?.continuationToken + ? Number.parseInt(query.continuationToken) + : 0; + const limit = query?.limit ?? 100; + + const searchResult = this.index.search({ + limit, + startFromIndex, + traceId: query?.traceId, + spanId: query?.spanId, + }); + + const logs: LogRecordData[] = []; + + for (const entry of searchResult.entries) { + const logFile = path.resolve(this.storeRoot, entry.file); + if (fs.existsSync(logFile)) { + const buffer = Buffer.alloc(entry.length); + const fd = fs.openSync(logFile, 'r'); + fs.readSync(fd, buffer, 0, entry.length, entry.offset); + fs.closeSync(fd); + try { + logs.push( + JSON.parse(buffer.toString('utf8').trim()) as LogRecordData + ); + } catch (e) { + logger.error(`Error parsing log at ${entry.file}:${entry.offset}`); + } + } + } + + return { + logs, + continuationToken: searchResult.pageLastIndex + ? `${searchResult.pageLastIndex}` + : undefined, + }; + } +} + +export interface LogIndexEntry { + traceId?: string; + spanId?: string; + timestamp: number; + severityText?: string; + severityNumber?: number; + file: string; + offset: number; + length: number; +} + +export interface LogIndexSearchResult { + pageLastIndex?: number; + entries: LogIndexEntry[]; +} + +export class LogIndex { + private currentIndexFile: string; + + constructor(private indexRoot: string) { + this.currentIndexFile = path.resolve( + this.indexRoot, + this.newIndexFileName() + ); + fs.mkdirSync(this.indexRoot, { recursive: true }); + } + + private newIndexFileName() { + return `idx_${(Date.now() + '').padStart(17, '0')}.jsonl`; + } + + listIndexFiles() { + return fs.readdirSync(this.indexRoot).filter((f) => f.startsWith('idx_')); + } + + add(entries: LogIndexEntry[]) { + if (entries.length === 0) return; + try { + lockfile.lockSync(lockFile(this.currentIndexFile)); + const lines = entries.map((e) => JSON.stringify(e) + '\n').join(''); + fs.appendFileSync(this.currentIndexFile, lines); + } catch (err) { + logger.error( + `Failed to lock log index file ${this.currentIndexFile}: ${err}` + ); + } finally { + if (fs.existsSync(lockFile(this.currentIndexFile))) { + lockfile.unlockSync(lockFile(this.currentIndexFile)); + } + } + } + + search(query: { + limit: number; + startFromIndex: number; + traceId?: string; + spanId?: string; + }): LogIndexSearchResult { + const indexFiles = this.listIndexFiles().sort().reverse(); + + let skipped = 0; + const entries: LogIndexEntry[] = []; + let hasMore = false; + + for (const idxFile of indexFiles) { + if (hasMore) break; + + const idxTxt = fs.readFileSync( + path.resolve(this.indexRoot, idxFile), + 'utf8' + ); + const fileData = idxTxt + .split('\n') + .filter((l) => l.trim().length > 0) + .map((l) => { + try { + return JSON.parse(l) as LogIndexEntry; + } catch { + return undefined; + } + }) + .filter((d): d is LogIndexEntry => { + if (!d) return false; + if (query.traceId && d.traceId !== query.traceId) return false; + if (query.spanId && d.spanId !== query.spanId) return false; + return true; + }); + + fileData.sort((a, b) => b.timestamp - a.timestamp); // Newest first + + for (const entry of fileData) { + if (skipped < query.startFromIndex) { + skipped++; + continue; + } + + if (entries.length < query.limit) { + entries.push(entry); + } else { + hasMore = true; + break; + } + } + } + + const result: LogIndexSearchResult = { + entries, + }; + + if (hasMore) { + result.pageLastIndex = query.startFromIndex + query.limit; + } + + return result; + } +} diff --git a/genkit-tools/telemetry-server/src/index.ts b/genkit-tools/telemetry-server/src/index.ts index 682a34c4ae..322dea4db5 100644 --- a/genkit-tools/telemetry-server/src/index.ts +++ b/genkit-tools/telemetry-server/src/index.ts @@ -14,6 +14,7 @@ * limitations under the License. */ +import type { LogStore } from '@genkit-ai/tools-common'; import { TraceDataSchema, TraceQueryFilterSchema, @@ -24,9 +25,10 @@ import cors from 'cors'; import express from 'express'; import type * as http from 'http'; import { BroadcastManager } from './broadcast-manager.js'; -import type { TraceStore } from './types'; -import { traceDataFromOtlp } from './utils/otlp'; +import type { TraceStore } from './types.js'; +import { logDataFromOtlp, traceDataFromOtlp } from './utils/otlp.js'; +export { LocalFileLogStore } from './file-log-store.js'; export { LocalFileTraceStore } from './file-trace-store.js'; export { TraceQuerySchema, type TraceQuery, type TraceStore } from './types'; @@ -39,6 +41,7 @@ const broadcastManager = new BroadcastManager(); export async function startTelemetryServer(params: { port: number; traceStore: TraceStore; + logStore: LogStore; /** * Controls the maximum request body size. If this is a number, * then the value specifies the number of bytes; if it is a string, @@ -50,6 +53,8 @@ export async function startTelemetryServer(params: { corsOrigin?: string | RegExp; }) { await params.traceStore.init(); + await params.logStore.init(); + const api = express(); // Allow all origins and expose trace ID header api.use( @@ -179,13 +184,68 @@ export async function startTelemetryServer(params: { } }); + api.get('/api/logs', async (request, response, next) => { + try { + const { limit, continuationToken } = request.query; + response.json( + await params.logStore.list({ + limit: limit ? Number.parseInt(limit.toString()) : 100, + continuationToken: continuationToken + ? continuationToken.toString() + : undefined, + }) + ); + } catch (e) { + next(e); + } + }); + + api.get('/api/traces/:traceId/logs', async (request, response, next) => { + try { + const { limit, continuationToken } = request.query; + const { traceId } = request.params; + response.json( + await params.logStore.list({ + limit: limit ? Number.parseInt(limit.toString()) : 100, + continuationToken: continuationToken + ? continuationToken.toString() + : undefined, + traceId, + }) + ); + } catch (e) { + next(e); + } + }); + + api.get('/api/spans/:spanId/logs', async (request, response, next) => { + try { + const { limit, continuationToken } = request.query; + const { spanId } = request.params; + response.json( + await params.logStore.list({ + limit: limit ? Number.parseInt(limit.toString()) : 100, + continuationToken: continuationToken + ? continuationToken.toString() + : undefined, + spanId, + }) + ); + } catch (e) { + next(e); + } + }); + api.post( '/api/otlp/:parentTraceId/:parentSpanId', async (request, response) => { try { const { parentTraceId, parentSpanId } = request.params; - if (!request.body.resourceSpans?.length) { + if ( + !request.body.resourceSpans?.length && + !request.body.resourceLogs?.length + ) { // Acknowledge and ignore empty payloads. response.status(200).json({}); return; @@ -202,6 +262,20 @@ export async function startTelemetryServer(params: { } await params.traceStore.save(parentTraceId, traceData); } + + if (request.body.resourceLogs?.length) { + const logs = logDataFromOtlp(request.body); + if (logs.length > 0) { + for (const log of logs) { + log.traceId = parentTraceId; + if (!log.spanId) { + log.spanId = parentSpanId; + } + } + await params.logStore.save(logs); + } + } + response.status(200).json({}); } catch (err) { logger.error(`Error processing OTLP payload: ${err}`); @@ -216,7 +290,10 @@ export async function startTelemetryServer(params: { api.post('/api/otlp', async (request, response) => { try { - if (!request.body.resourceSpans?.length) { + if ( + !request.body.resourceSpans?.length && + !request.body.resourceLogs?.length + ) { // Acknowledge and ignore empty payloads. response.status(200).json({}); return; @@ -240,6 +317,14 @@ export async function startTelemetryServer(params: { broadcastManager.broadcast(traceData.traceId, event); } } + + if (request.body.resourceLogs?.length) { + const logs = logDataFromOtlp(request.body); + if (logs.length > 0) { + await params.logStore.save(logs); + } + } + response.status(200).json({}); } catch (err) { logger.error(`Error processing OTLP payload: ${err}`); diff --git a/genkit-tools/telemetry-server/src/utils/otlp.ts b/genkit-tools/telemetry-server/src/utils/otlp.ts index af98cb68d1..f5ea1bea68 100644 --- a/genkit-tools/telemetry-server/src/utils/otlp.ts +++ b/genkit-tools/telemetry-server/src/utils/otlp.ts @@ -14,7 +14,7 @@ * limitations under the License. */ -import { SpanData, TraceData } from '@genkit-ai/tools-common'; +import { LogRecordData, SpanData, TraceData } from '@genkit-ai/tools-common'; // These interfaces are based on the OTLP JSON format. // A full definition can be found at: @@ -70,8 +70,44 @@ interface OtlpResourceSpan { scopeSpans: OtlpScopeSpan[]; } -interface OtlpPayload { - resourceSpans: OtlpResourceSpan[]; +interface OtlpLogRecord { + timeUnixNano: string; + severityNumber?: number; + severityText?: string; + body?: OtlpValue; + attributes?: OtlpAttribute[]; + traceId?: string; + spanId?: string; +} + +interface OtlpScopeLog { + scope: { + name: string; + version: string; + }; + logRecords: OtlpLogRecord[]; +} + +interface OtlpResourceLog { + resource: { + attributes: OtlpAttribute[]; + droppedAttributesCount: number; + }; + scopeLogs: OtlpScopeLog[]; +} + +export interface OtlpPayload { + resourceSpans?: OtlpResourceSpan[]; + resourceLogs?: OtlpResourceLog[]; +} + +export function fromOtlpValue(value: OtlpValue): any { + if (value.stringValue !== undefined) return value.stringValue; + if (value.intValue !== undefined) return value.intValue; + if (value.boolValue !== undefined) return value.boolValue; + if (value.arrayValue !== undefined) + return value.arrayValue.values.map(fromOtlpValue); + return undefined; } function toMillis(nano: string): number { @@ -81,12 +117,9 @@ function toMillis(nano: string): number { function toSpanData(span: OtlpSpan, scope: OtlpScopeSpan['scope']): SpanData { const attributes: Record = {}; span.attributes.forEach((attr) => { - if (attr.value.stringValue) { - attributes[attr.key] = attr.value.stringValue; - } else if (attr.value.intValue) { - attributes[attr.key] = attr.value.intValue; - } else if (attr.value.boolValue) { - attributes[attr.key] = attr.value.boolValue; + const val = fromOtlpValue(attr.value); + if (val !== undefined) { + attributes[attr.key] = val; } }); @@ -141,22 +174,74 @@ function toSpanData(span: OtlpSpan, scope: OtlpScopeSpan['scope']): SpanData { export function traceDataFromOtlp(otlpData: OtlpPayload): TraceData[] { const traces: Record = {}; - otlpData.resourceSpans.forEach((resourceSpan) => { - resourceSpan.scopeSpans.forEach((scopeSpan) => { - scopeSpan.spans.forEach((span) => { - if (!traces[span.traceId]) { - traces[span.traceId] = { - traceId: span.traceId, - spans: {}, - }; - } - traces[span.traceId].spans[span.spanId] = toSpanData( - span, - scopeSpan.scope - ); + if (otlpData.resourceSpans) { + otlpData.resourceSpans.forEach((resourceSpan) => { + resourceSpan.scopeSpans.forEach((scopeSpan) => { + scopeSpan.spans.forEach((span) => { + if (!traces[span.traceId]) { + traces[span.traceId] = { + traceId: span.traceId, + spans: {}, + }; + } + traces[span.traceId].spans[span.spanId] = toSpanData( + span, + scopeSpan.scope + ); + }); }); }); - }); + } return Object.values(traces); } + +function toLogRecordData( + log: OtlpLogRecord, + scope: OtlpScopeLog['scope'] +): LogRecordData { + const attributes: Record = {}; + if (log.attributes) { + log.attributes.forEach((attr) => { + const val = fromOtlpValue(attr.value); + if (val !== undefined) { + attributes[attr.key] = val; + } + }); + } + + let body: any = undefined; + if (log.body) { + body = fromOtlpValue(log.body); + } + + return { + logId: '', // Server will populate this if empty + traceId: log.traceId, + spanId: log.spanId, + timestamp: toMillis(log.timeUnixNano), + severityNumber: log.severityNumber, + severityText: log.severityText, + body, + attributes, + instrumentationLibrary: { + name: scope.name, + version: scope.version, + }, + }; +} + +export function logDataFromOtlp(otlpData: OtlpPayload): LogRecordData[] { + const logs: LogRecordData[] = []; + if (!otlpData.resourceLogs) return logs; + + otlpData.resourceLogs.forEach((resourceLog) => { + resourceLog.scopeLogs.forEach((scopeLog) => { + scopeLog.logRecords.forEach((log) => { + logs.push(toLogRecordData(log, scopeLog.scope)); + }); + }); + }); + + return logs; +} diff --git a/genkit-tools/telemetry-server/tests/file-log-store_test.ts b/genkit-tools/telemetry-server/tests/file-log-store_test.ts new file mode 100644 index 0000000000..f38677b416 --- /dev/null +++ b/genkit-tools/telemetry-server/tests/file-log-store_test.ts @@ -0,0 +1,128 @@ +/** + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { LogRecordData } from '@genkit-ai/tools-common'; +import * as assert from 'assert'; +import * as fs from 'fs'; +import { afterEach, beforeEach, describe, it } from 'node:test'; +import * as path from 'path'; +import { LocalFileLogStore } from '../src/file-log-store'; + +describe('LocalFileLogStore', () => { + const testRoot = path.join(__dirname, '.test_log_store'); + const indexRoot = path.join(testRoot, 'idx'); + const storeRoot = path.join(testRoot, 'logs'); + let logStore: LocalFileLogStore; + + beforeEach(async () => { + if (fs.existsSync(testRoot)) { + fs.rmSync(testRoot, { recursive: true, force: true }); + } + logStore = new LocalFileLogStore({ storeRoot, indexRoot }); + await logStore.init(); + }); + + afterEach(() => { + if (fs.existsSync(testRoot)) { + fs.rmSync(testRoot, { recursive: true, force: true }); + } + }); + + it('should store logs and retrieve them via traceId lookup', async () => { + const mockLogs: LogRecordData[] = [ + { + logId: 'log-1', + traceId: 'trace-1', + spanId: 'span-1', + timestamp: 1000, + severityText: 'INFO', + body: 'First message', + }, + { + logId: 'log-2', + traceId: 'trace-2', + spanId: 'span-2', + timestamp: 2000, + severityText: 'ERROR', + body: 'Second message', + }, + { + logId: 'log-3', + traceId: 'trace-1', + spanId: 'span-3', + timestamp: 3000, + severityText: 'DEBUG', + body: 'Third message', + }, + ]; + + await logStore.save(mockLogs); + + const resultTrace1 = await logStore.list({ traceId: 'trace-1' }); + const resultTrace2 = await logStore.list({ traceId: 'trace-2' }); + + // Returns newest first based on the LogIndex implementation + assert.strictEqual(resultTrace1.logs.length, 2); + assert.strictEqual(resultTrace1.logs[0].logId, 'log-3'); + assert.strictEqual(resultTrace1.logs[1].logId, 'log-1'); + + assert.strictEqual(resultTrace2.logs.length, 1); + assert.strictEqual(resultTrace2.logs[0].logId, 'log-2'); + }); + + it('should retrieve logs by spanId', async () => { + const mockLogs: LogRecordData[] = [ + { + logId: 'log-1', + traceId: 'trace-1', + spanId: 'span-x', + timestamp: 1000, + body: 'hello', + }, + { + logId: 'log-2', + traceId: 'trace-1', + spanId: 'span-y', + timestamp: 2000, + body: 'world', + }, + ]; + + await logStore.save(mockLogs); + + const result = await logStore.list({ spanId: 'span-x' }); + assert.strictEqual(result.logs.length, 1); + assert.strictEqual(result.logs[0].logId, 'log-1'); + }); + + it('should populate logId if none belongs', async () => { + const mockLogs: LogRecordData[] = [ + { + traceId: 'trace-missing-id', + spanId: 'span-missing-id', + timestamp: 1234, + body: 'generated ID test', + logId: '', + }, + ]; + + await logStore.save(mockLogs); + + const result = await logStore.list({ traceId: 'trace-missing-id' }); + assert.strictEqual(result.logs.length, 1); + assert.ok(result.logs[0].logId.includes('trace-missing-id')); + }); +}); diff --git a/genkit-tools/telemetry-server/tests/file_store_test.ts b/genkit-tools/telemetry-server/tests/file_store_test.ts index 39329547f3..389d52a6f8 100644 --- a/genkit-tools/telemetry-server/tests/file_store_test.ts +++ b/genkit-tools/telemetry-server/tests/file_store_test.ts @@ -23,6 +23,7 @@ import os from 'os'; import path from 'path'; import { Index } from '../src/file-trace-store'; import { + LocalFileLogStore, LocalFileTraceStore, startTelemetryServer, stopTelemetryApi, @@ -61,6 +62,10 @@ describe('local-file-store', () => { storeRoot, indexRoot, }), + logStore: new LocalFileLogStore({ + storeRoot, + indexRoot, + }), }); }); @@ -1053,6 +1058,10 @@ describe('otlp-endpoint', () => { storeRoot, indexRoot, }), + logStore: new LocalFileLogStore({ + storeRoot, + indexRoot, + }), }); }); @@ -1198,6 +1207,10 @@ describe('otlp-endpoint (with parent)', () => { storeRoot, indexRoot, }), + logStore: new LocalFileLogStore({ + storeRoot, + indexRoot, + }), }); }); diff --git a/genkit-tools/telemetry-server/tests/otlp_test.ts b/genkit-tools/telemetry-server/tests/otlp_test.ts index d1af719c4c..cce9af766a 100644 --- a/genkit-tools/telemetry-server/tests/otlp_test.ts +++ b/genkit-tools/telemetry-server/tests/otlp_test.ts @@ -16,7 +16,7 @@ import * as assert from 'assert'; import { describe, it } from 'node:test'; -import { traceDataFromOtlp } from '../src/utils/otlp'; +import { logDataFromOtlp, traceDataFromOtlp } from '../src/utils/otlp'; describe('otlp-traces', () => { it('should transform OTLP payload to TraceData', () => { @@ -214,3 +214,84 @@ describe('otlp-traces', () => { assert.deepStrictEqual(result, expectedTraceData); }); }); + +describe('otlp-logs', () => { + it('should transform OTLP log payload to LogRecordData', () => { + const otlpPayload = { + resourceLogs: [ + { + resource: { + attributes: [], + droppedAttributesCount: 0, + }, + scopeLogs: [ + { + scope: { name: 'genkit-tracer', version: 'v1' }, + logRecords: [ + { + timeUnixNano: '1760827335359000000', + severityText: 'INFO', + severityNumber: 9, + body: { stringValue: 'This is a test log message' }, + traceId: 'c5892692eb25cce482eb13587b73c425', + spanId: '86dc3d35cc11e336', + attributes: [ + { + key: 'genkit:name', + value: { stringValue: 'generateContentStream' }, + }, + ], + }, + { + timeUnixNano: '1760827336695073000', + severityText: 'ERROR', + severityNumber: 17, + body: { stringValue: 'An error occurred' }, + traceId: 'c5892692eb25cce482eb13587b73c425', + }, + ], + }, + ], + }, + ], + }; + + const expectedLogData = [ + { + logId: '', + traceId: 'c5892692eb25cce482eb13587b73c425', + spanId: '86dc3d35cc11e336', + timestamp: 1760827335359, + severityNumber: 9, + severityText: 'INFO', + body: 'This is a test log message', + attributes: { + 'genkit:name': 'generateContentStream', + }, + instrumentationLibrary: { + name: 'genkit-tracer', + version: 'v1', + }, + }, + { + logId: '', + traceId: 'c5892692eb25cce482eb13587b73c425', + spanId: undefined, + timestamp: 1760827336695, + severityNumber: 17, + severityText: 'ERROR', + body: 'An error occurred', + attributes: {}, + instrumentationLibrary: { + name: 'genkit-tracer', + version: 'v1', + }, + }, + ]; + + const result = logDataFromOtlp(otlpPayload); + // Overwrite the random logId generator for deep strict equal + result.forEach((log) => (log.logId = '')); + assert.deepStrictEqual(result, expectedLogData); + }); +}); diff --git a/js/core/package.json b/js/core/package.json index 319838c50f..65d4cf7aac 100644 --- a/js/core/package.json +++ b/js/core/package.json @@ -27,10 +27,12 @@ "license": "Apache-2.0", "dependencies": { "@opentelemetry/api": "^1.9.0", + "@opentelemetry/api-logs": "^0.52.0", "@opentelemetry/context-async-hooks": "~1.25.0", "@opentelemetry/core": "~1.25.0", "@opentelemetry/sdk-metrics": "~1.25.0", "@opentelemetry/sdk-node": "^0.52.0", + "@opentelemetry/sdk-logs": "^0.52.0", "@opentelemetry/sdk-trace-base": "~1.25.0", "@types/json-schema": "^7.0.15", "ajv": "^8.12.0", diff --git a/js/core/src/logging.ts b/js/core/src/logging.ts index 284fc56143..00639ed669 100644 --- a/js/core/src/logging.ts +++ b/js/core/src/logging.ts @@ -14,6 +14,8 @@ * limitations under the License. */ +import { logs, SeverityNumber } from '@opentelemetry/api-logs'; + const LOG_LEVELS = ['debug', 'info', 'warn', 'error']; const loggerKey = '__genkit_logger'; @@ -47,6 +49,64 @@ function getLogger() { class Logger { readonly defaultLogger = _defaultLogger; + private _emitOtel( + level: string, + args: any[], + explicitBody?: string, + explicitAttributes?: Record + ) { + try { + const otelLogger = logs.getLogger('genkit-logger'); + let severityNumber: SeverityNumber; + switch (level) { + case 'debug': + severityNumber = SeverityNumber.DEBUG; + break; + case 'info': + severityNumber = SeverityNumber.INFO; + break; + case 'warn': + severityNumber = SeverityNumber.WARN; + break; + case 'error': + severityNumber = SeverityNumber.ERROR; + break; + default: + severityNumber = SeverityNumber.UNSPECIFIED; + break; + } + + let body; + const attributes: Record = explicitAttributes || {}; + if (explicitBody !== undefined) { + body = explicitBody; + } else if (args.length === 1 && typeof args[0] === 'string') { + body = args[0]; + } else { + const util = require('util'); + body = util.format(...args); + } + + let activeContext; + try { + const { context } = require('@opentelemetry/api'); + activeContext = context.active(); + } catch (e) { + // No-op if @opentelemetry/api trace is uninitialized or missing right now + } + + otelLogger.emit({ + severityNumber, + severityText: level.toUpperCase(), + body, + attributes, + ...(activeContext ? { context: activeContext } : {}), + }); + } catch (err) { + // safe ignore + } + } + init(fn: any) { global[loggerKey] = fn; } @@ -54,18 +114,22 @@ class Logger { info(...args: any) { // eslint-disable-next-line prefer-spread getLogger().info.apply(getLogger(), args); + this._emitOtel('info', args); } debug(...args: any) { // eslint-disable-next-line prefer-spread getLogger().debug.apply(getLogger(), args); + this._emitOtel('debug', args); } error(...args: any) { // eslint-disable-next-line prefer-spread getLogger().error.apply(getLogger(), args); + this._emitOtel('error', args); } warn(...args: any) { // eslint-disable-next-line prefer-spread getLogger().warn.apply(getLogger(), args); + this._emitOtel('warn', args); } setLogLevel(level: 'error' | 'warn' | 'info' | 'debug') { @@ -74,10 +138,12 @@ class Logger { logStructured(msg: string, metadata: any) { getLogger().info(msg, metadata); + this._emitOtel('info', [], msg, metadata); } logStructuredError(msg: string, metadata: any) { getLogger().error(msg, metadata); + this._emitOtel('error', [], msg, metadata); } } diff --git a/js/core/src/tracing/exporter.ts b/js/core/src/tracing/exporter.ts index a92a40a5f5..ca56190854 100644 --- a/js/core/src/tracing/exporter.ts +++ b/js/core/src/tracing/exporter.ts @@ -21,6 +21,10 @@ import { suppressTracing, type ExportResult, } from '@opentelemetry/core'; +import type { + LogRecordExporter, + ReadableLogRecord, +} from '@opentelemetry/sdk-logs'; import type { ReadableSpan, SpanExporter } from '@opentelemetry/sdk-trace-base'; import { logger } from '../logging.js'; import { deleteUndefinedProps } from '../utils.js'; @@ -178,3 +182,103 @@ export class TraceServerExporter implements SpanExporter { function transformTime(time: HrTime) { return hrTimeToMilliseconds(time); } + +/** + * Exports collected OpenTelemetetry logs to the telemetry server. + */ +export class LogServerExporter implements LogRecordExporter { + export( + logs: ReadableLogRecord[], + resultCallback: (result: ExportResult) => void + ): void { + this._sendLogs(logs, resultCallback); + } + + shutdown(): Promise { + return this.forceFlush(); + } + + forceFlush(): Promise { + return Promise.resolve(); + } + + private async _sendLogs( + logs: ReadableLogRecord[], + done?: (result: ExportResult) => void + ): Promise { + if (!telemetryServerUrl) { + if (done) done({ code: ExportResultCode.SUCCESS }); + return; + } + + try { + const scopeLogsMap = new Map(); + for (const log of logs) { + const scopeName = log.instrumentationScope.name || 'unknown'; + if (!scopeLogsMap.has(scopeName)) { + scopeLogsMap.set(scopeName, { + scope: { + name: scopeName, + version: log.instrumentationScope.version || '', + }, + logRecords: [], + }); + } + + const attributes: any[] = []; + for (const [k, v] of Object.entries(log.attributes)) { + if (typeof v === 'string') + attributes.push({ key: k, value: { stringValue: v } }); + else if (typeof v === 'number') + attributes.push({ key: k, value: { intValue: v } }); + else if (typeof v === 'boolean') + attributes.push({ key: k, value: { boolValue: v } }); + } + + let bodyValue; + if (typeof log.body === 'string') bodyValue = { stringValue: log.body }; + else if (typeof log.body === 'number') + bodyValue = { intValue: log.body }; + else if (typeof log.body === 'boolean') + bodyValue = { boolValue: log.body }; + else bodyValue = { stringValue: JSON.stringify(log.body) }; + + scopeLogsMap.get(scopeName).logRecords.push({ + timeUnixNano: ( + hrTimeToMilliseconds(log.hrTime) * 1_000_000 + ).toString(), + severityNumber: log.severityNumber, + severityText: log.severityText, + body: bodyValue, + attributes, + traceId: log.spanContext?.traceId, + spanId: log.spanContext?.spanId, + }); + } + + const payload = { + resourceLogs: [ + { + resource: { attributes: [], droppedAttributesCount: 0 }, + scopeLogs: Array.from(scopeLogsMap.values()), + }, + ], + }; + + await context.with(suppressTracing(context.active()), () => + fetch(`${telemetryServerUrl}/api/otlp`, { + method: 'POST', + headers: { + Accept: 'application/json', + 'Content-Type': 'application/json', + }, + body: JSON.stringify(payload), + }) + ); + if (done) done({ code: ExportResultCode.SUCCESS }); + } catch (e) { + logger.error('Failed to export logs', e); + if (done) done({ code: ExportResultCode.FAILED }); + } + } +} diff --git a/js/core/src/tracing/node-telemetry-provider.ts b/js/core/src/tracing/node-telemetry-provider.ts index 08a11d1eaa..a17d242330 100644 --- a/js/core/src/tracing/node-telemetry-provider.ts +++ b/js/core/src/tracing/node-telemetry-provider.ts @@ -14,6 +14,11 @@ * limitations under the License. */ +import { + BatchLogRecordProcessor, + SimpleLogRecordProcessor, + type LogRecordProcessor, +} from '@opentelemetry/sdk-logs'; import { NodeSDK } from '@opentelemetry/sdk-node'; import { BatchSpanProcessor, @@ -24,7 +29,11 @@ import { logger } from '../logging.js'; import type { TelemetryConfig } from '../telemetryTypes.js'; import { setTelemetryProvider } from '../tracing.js'; import { isDevEnv } from '../utils.js'; -import { TraceServerExporter, setTelemetryServerUrl } from './exporter.js'; +import { + LogServerExporter, + TraceServerExporter, + setTelemetryServerUrl, +} from './exporter.js'; import { RealtimeSpanProcessor } from './realtime-span-processor.js'; let telemetrySDK: NodeSDK | null = null; @@ -66,6 +75,17 @@ async function enableTelemetry( delete nodeOtelConfig.spanProcessor; } nodeOtelConfig.spanProcessors = processors; + + // Add LogRecordProcessors + const enableRealTimeTelemetry = + process.env.GENKIT_ENABLE_REALTIME_TELEMETRY === 'true'; + const logExporter = new LogServerExporter(); + const logProcessor: LogRecordProcessor = + isDevEnv() || enableRealTimeTelemetry + ? new SimpleLogRecordProcessor(logExporter) + : new BatchLogRecordProcessor(logExporter); + nodeOtelConfig.logRecordProcessor = logProcessor; + telemetrySDK = new NodeSDK(nodeOtelConfig); telemetrySDK.start(); process.on('SIGTERM', async () => await cleanUpTracing()); @@ -110,10 +130,15 @@ function maybeFlushMetrics(): Promise { } /** - * Flushes all configured span processors. + * Flushes all configured span and log processors. */ async function flushTracing() { + const promises: Promise[] = []; if (nodeOtelConfig?.spanProcessors) { - await Promise.all(nodeOtelConfig.spanProcessors.map((p) => p.forceFlush())); + promises.push(...nodeOtelConfig.spanProcessors.map((p) => p.forceFlush())); + } + if (nodeOtelConfig?.logRecordProcessor) { + promises.push(nodeOtelConfig.logRecordProcessor.forceFlush()); } + await Promise.all(promises); } diff --git a/js/pnpm-lock.yaml b/js/pnpm-lock.yaml index 5b761b6bdc..95262adb7d 100644 --- a/js/pnpm-lock.yaml +++ b/js/pnpm-lock.yaml @@ -102,12 +102,18 @@ importers: '@opentelemetry/api': specifier: ^1.9.0 version: 1.9.0 + '@opentelemetry/api-logs': + specifier: ^0.52.0 + version: 0.52.1 '@opentelemetry/context-async-hooks': specifier: ~1.25.0 version: 1.25.1(@opentelemetry/api@1.9.0) '@opentelemetry/core': specifier: ~1.25.0 version: 1.25.1(@opentelemetry/api@1.9.0) + '@opentelemetry/sdk-logs': + specifier: ^0.52.0 + version: 0.52.1(@opentelemetry/api@1.9.0) '@opentelemetry/sdk-metrics': specifier: ~1.25.0 version: 1.25.1(@opentelemetry/api@1.9.0) diff --git a/js/testapps/flow-simple-ai/src/index.ts b/js/testapps/flow-simple-ai/src/index.ts index 78c729a317..b12e16b3ac 100644 --- a/js/testapps/flow-simple-ai/src/index.ts +++ b/js/testapps/flow-simple-ai/src/index.ts @@ -767,6 +767,9 @@ ai.defineModel( ); const blockingMiddleware: ModelMiddleware = async (req, next) => { + logger.warn( + 'blockingMiddleware invoked: request blocked due to policy violation.' + ); return { finishReason: 'blocked', finishMessage: `Model input violated policies: further processing blocked.`,