From d11b9c02cb10c631e188bfdef355bb69ba84c93d Mon Sep 17 00:00:00 2001 From: Jayadeep Bejoy Date: Tue, 16 Dec 2025 23:19:50 +0400 Subject: [PATCH 1/4] feat(ai_stream) Signed-off-by: Jayadeep Bejoy --- .gitignore | 3 + proto/event/v1/event.proto | 36 ++- src/__tests__/unit/zod/event.test.ts | 16 +- src/events/AIEvents/AITokenUsage.ts | 27 +++ src/factory/StorageAdapterFactory.ts | 3 + src/gen/event/v1/event_connect.ts | 13 +- src/gen/event/v1/event_pb.ts | 139 +++++++++++- src/interface/event/Event.ts | 22 +- src/routes/gRPC/events/registerEvent.ts | 96 +++----- src/routes/gRPC/events/streamEvents.ts | 142 ++++++++++++ src/server.ts | 2 + .../postgres/handlers/addAiTokenUsage.ts | 210 ++++++++++++++++++ .../adapter/postgres/handlers/index.ts | 1 + src/storage/adapter/postgres/postgres.ts | 8 + src/storage/db/postgres/schema.ts | 25 +++ src/utils/eventHelpers.ts | 96 ++++++++ src/zod/event.ts | 107 ++++++++- 17 files changed, 867 insertions(+), 79 deletions(-) create mode 100644 src/events/AIEvents/AITokenUsage.ts create mode 100644 src/routes/gRPC/events/streamEvents.ts create mode 100644 src/storage/adapter/postgres/handlers/addAiTokenUsage.ts create mode 100644 src/utils/eventHelpers.ts diff --git a/.gitignore b/.gitignore index 6cc4c21..7b2fa09 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,9 @@ dist coverage *.lcov +# local testing +test-stream-events.ts + # logs logs _.log diff --git a/proto/event/v1/event.proto b/proto/event/v1/event.proto index d63ef64..37e0ca2 100644 --- a/proto/event/v1/event.proto +++ b/proto/event/v1/event.proto @@ -5,11 +5,15 @@ package event.v1; service EventService { // RegisterEvent registers an event as being done by a user rpc RegisterEvent(RegisterEventRequest) returns (RegisterEventResponse) {} + + // StreamEvents streams events from client to server (e.g., AI token usage) + rpc StreamEvents(stream StreamEventRequest) returns (StreamEventResponse) {} } enum EventType { EVENT_TYPE_UNSPECIFIED = 0; SDK_CALL = 1; + AI_TOKEN_USAGE = 2; } enum SDKCallType { @@ -28,7 +32,7 @@ message RegisterEventRequest { message SDKCall { SDKCallType sdkCallType = 1; - + oneof debit { float amount = 2; string tag = 3; @@ -38,3 +42,33 @@ message SDKCall { message RegisterEventResponse { string random = 1; } + +message StreamEventRequest { + EventType type = 1; + string userId = 2; + oneof data { + SDKCall sdkCall = 3; + AITokenUsage aiTokenUsage = 4; + } +} + +message AITokenUsage { + string model = 1; + int32 inputTokens = 2; + int32 outputTokens = 3; + + oneof inputDebit { + float inputAmount = 4; + string inputTag = 5; + } + + oneof outputDebit { + float outputAmount = 6; + string outputTag = 7; + } +} + +message StreamEventResponse { + int32 eventsProcessed = 1; + string message = 2; +} diff --git a/src/__tests__/unit/zod/event.test.ts b/src/__tests__/unit/zod/event.test.ts index 3ebefa1..483b545 100644 --- a/src/__tests__/unit/zod/event.test.ts +++ b/src/__tests__/unit/zod/event.test.ts @@ -23,8 +23,10 @@ describe("eventSchema", () => { if (result.success) { expect(result.data.type).toBe("SDK_CALL"); expect(result.data.userId).toBe("550e8400-e29b-41d4-a716-446655440000"); - expect(result.data.data.sdkCallType).toBe("RAW"); - expect(result.data.data.debitAmount).toBe(1050); + if (result.data.type === "SDK_CALL") { + expect(result.data.data.sdkCallType).toBe("RAW"); + expect(result.data.data.debitAmount).toBe(1050); + } } }); @@ -48,8 +50,10 @@ describe("eventSchema", () => { expect(result.success).toBe(true); if (result.success) { expect(result.data.type).toBe("SDK_CALL"); - expect(result.data.data.sdkCallType).toBe("MIDDLEWARE_CALL"); - expect(result.data.data.debitAmount).toBe(2599); + if (result.data.type === "SDK_CALL") { + expect(result.data.data.sdkCallType).toBe("MIDDLEWARE_CALL"); + expect(result.data.data.debitAmount).toBe(2599); + } } }); @@ -71,7 +75,7 @@ describe("eventSchema", () => { const result = await eventSchema.safeParseAsync(validEvent); expect(result.success).toBe(true); - if (result.success) { + if (result.success && result.data.type === "SDK_CALL") { expect(result.data.data.debitAmount).toBe(12345); } }); @@ -94,7 +98,7 @@ describe("eventSchema", () => { const result = await eventSchema.safeParseAsync(validEvent); expect(result.success).toBe(true); - if (result.success) { + if (result.success && result.data.type === "SDK_CALL") { expect(result.data.data).not.toHaveProperty("case"); expect(result.data.data).toHaveProperty("sdkCallType"); expect(result.data.data).toHaveProperty("debitAmount"); diff --git a/src/events/AIEvents/AITokenUsage.ts b/src/events/AIEvents/AITokenUsage.ts new file mode 100644 index 0000000..939110b --- /dev/null +++ b/src/events/AIEvents/AITokenUsage.ts @@ -0,0 +1,27 @@ +import type { AITokenUsageEventType } from "../../interface/event/Event"; +import { DateTime } from "luxon"; +import type { EventUnion } from "../../interface/event/Event"; +import { type UserId } from "../../config/identifiers"; + +export class AITokenUsage implements AITokenUsageEventType { + public reported_timestamp: DateTime; + public readonly type = "AI_TOKEN_USAGE" as const; + + constructor( + public userId: UserId, + public data: EventUnion<"AI_TOKEN_USAGE">, + ) { + this.reported_timestamp = DateTime.utc(); + } + + serialize() { + return { + SQL: { + type: this.type, + userId: this.userId, + reported_timestamp: this.reported_timestamp, + data: this.data, + }, + }; + } +} diff --git a/src/factory/StorageAdapterFactory.ts b/src/factory/StorageAdapterFactory.ts index e413cda..9e9874a 100644 --- a/src/factory/StorageAdapterFactory.ts +++ b/src/factory/StorageAdapterFactory.ts @@ -20,6 +20,9 @@ export class StorageAdapterFactory { case "SDK_CALL": { return new PostgresAdapter(event, apiKeyId); } + case "AI_TOKEN_USAGE": { + return new PostgresAdapter(event, apiKeyId); + } case "PAYMENT": { return new PostgresAdapter(event, apiKeyId); } diff --git a/src/gen/event/v1/event_connect.ts b/src/gen/event/v1/event_connect.ts index 2e04fbb..e0b14ef 100644 --- a/src/gen/event/v1/event_connect.ts +++ b/src/gen/event/v1/event_connect.ts @@ -3,7 +3,7 @@ /* eslint-disable */ // @ts-nocheck -import { RegisterEventRequest, RegisterEventResponse } from "./event_pb.js"; +import { RegisterEventRequest, RegisterEventResponse, StreamEventRequest, StreamEventResponse } from "./event_pb.js"; import { MethodKind } from "@bufbuild/protobuf"; /** @@ -23,6 +23,17 @@ export const EventService = { O: RegisterEventResponse, kind: MethodKind.Unary, }, + /** + * StreamEvents streams events from client to server (e.g., AI token usage) + * + * @generated from rpc event.v1.EventService.StreamEvents + */ + streamEvents: { + name: "StreamEvents", + I: StreamEventRequest, + O: StreamEventResponse, + kind: MethodKind.ClientStreaming, + }, } } as const; diff --git a/src/gen/event/v1/event_pb.ts b/src/gen/event/v1/event_pb.ts index d6363c5..aded307 100644 --- a/src/gen/event/v1/event_pb.ts +++ b/src/gen/event/v1/event_pb.ts @@ -10,7 +10,7 @@ import type { Message } from "@bufbuild/protobuf"; * Describes the file event/v1/event.proto. */ export const file_event_v1_event: GenFile = /*@__PURE__*/ - fileDesc("ChRldmVudC92MS9ldmVudC5wcm90bxIIZXZlbnQudjEidwoUUmVnaXN0ZXJFdmVudFJlcXVlc3QSIQoEdHlwZRgBIAEoDjITLmV2ZW50LnYxLkV2ZW50VHlwZRIOCgZ1c2VySWQYAiABKAkSJAoHc2RrQ2FsbBgDIAEoCzIRLmV2ZW50LnYxLlNES0NhbGxIAEIGCgRkYXRhIl8KB1NES0NhbGwSKgoLc2RrQ2FsbFR5cGUYASABKA4yFS5ldmVudC52MS5TREtDYWxsVHlwZRIQCgZhbW91bnQYAiABKAJIABINCgN0YWcYAyABKAlIAEIHCgVkZWJpdCInChVSZWdpc3RlckV2ZW50UmVzcG9uc2USDgoGcmFuZG9tGAEgASgJKjUKCUV2ZW50VHlwZRIaChZFVkVOVF9UWVBFX1VOU1BFQ0lGSUVEEAASDAoIU0RLX0NBTEwQASpICgtTREtDYWxsVHlwZRIbChdTREtDYWxsVHlwZV9VTlNQRUNJRklFRBAAEgcKA1JBVxABEhMKD01JRERMRVdBUkVfQ0FMTBACMmIKDEV2ZW50U2VydmljZRJSCg1SZWdpc3RlckV2ZW50Eh4uZXZlbnQudjEuUmVnaXN0ZXJFdmVudFJlcXVlc3QaHy5ldmVudC52MS5SZWdpc3RlckV2ZW50UmVzcG9uc2UiAGIGcHJvdG8z"); + fileDesc("ChRldmVudC92MS9ldmVudC5wcm90bxIIZXZlbnQudjEidwoUUmVnaXN0ZXJFdmVudFJlcXVlc3QSIQoEdHlwZRgBIAEoDjITLmV2ZW50LnYxLkV2ZW50VHlwZRIOCgZ1c2VySWQYAiABKAkSJAoHc2RrQ2FsbBgDIAEoCzIRLmV2ZW50LnYxLlNES0NhbGxIAEIGCgRkYXRhIl8KB1NES0NhbGwSKgoLc2RrQ2FsbFR5cGUYASABKA4yFS5ldmVudC52MS5TREtDYWxsVHlwZRIQCgZhbW91bnQYAiABKAJIABINCgN0YWcYAyABKAlIAEIHCgVkZWJpdCInChVSZWdpc3RlckV2ZW50UmVzcG9uc2USDgoGcmFuZG9tGAEgASgJIqUBChJTdHJlYW1FdmVudFJlcXVlc3QSIQoEdHlwZRgBIAEoDjITLmV2ZW50LnYxLkV2ZW50VHlwZRIOCgZ1c2VySWQYAiABKAkSJAoHc2RrQ2FsbBgDIAEoCzIRLmV2ZW50LnYxLlNES0NhbGxIABIuCgxhaVRva2VuVXNhZ2UYBCABKAsyFi5ldmVudC52MS5BSVRva2VuVXNhZ2VIAEIGCgRkYXRhIr0BCgxBSVRva2VuVXNhZ2USDQoFbW9kZWwYASABKAkSEwoLaW5wdXRUb2tlbnMYAiABKAUSFAoMb3V0cHV0VG9rZW5zGAMgASgFEhUKC2lucHV0QW1vdW50GAQgASgCSAASEgoIaW5wdXRUYWcYBSABKAlIABIWCgxvdXRwdXRBbW91bnQYBiABKAJIARITCglvdXRwdXRUYWcYByABKAlIAUIMCgppbnB1dERlYml0Qg0KC291dHB1dERlYml0Ij8KE1N0cmVhbUV2ZW50UmVzcG9uc2USFwoPZXZlbnRzUHJvY2Vzc2VkGAEgASgFEg8KB21lc3NhZ2UYAiABKAkqSQoJRXZlbnRUeXBlEhoKFkVWRU5UX1RZUEVfVU5TUEVDSUZJRUQQABIMCghTREtfQ0FMTBABEhIKDkFJX1RPS0VOX1VTQUdFEAIqSAoLU0RLQ2FsbFR5cGUSGwoXU0RLQ2FsbFR5cGVfVU5TUEVDSUZJRUQQABIHCgNSQVcQARITCg9NSURETEVXQVJFX0NBTEwQAjKzAQoMRXZlbnRTZXJ2aWNlElIKDVJlZ2lzdGVyRXZlbnQSHi5ldmVudC52MS5SZWdpc3RlckV2ZW50UmVxdWVzdBofLmV2ZW50LnYxLlJlZ2lzdGVyRXZlbnRSZXNwb25zZSIAEk8KDFN0cmVhbUV2ZW50cxIcLmV2ZW50LnYxLlN0cmVhbUV2ZW50UmVxdWVzdBodLmV2ZW50LnYxLlN0cmVhbUV2ZW50UmVzcG9uc2UiACgBYgZwcm90bzM"); /** * @generated from message event.v1.RegisterEventRequest @@ -96,6 +96,128 @@ export type RegisterEventResponse = Message<"event.v1.RegisterEventResponse"> & export const RegisterEventResponseSchema: GenMessage = /*@__PURE__*/ messageDesc(file_event_v1_event, 2); +/** + * @generated from message event.v1.StreamEventRequest + */ +export type StreamEventRequest = Message<"event.v1.StreamEventRequest"> & { + /** + * @generated from field: event.v1.EventType type = 1; + */ + type: EventType; + + /** + * @generated from field: string userId = 2; + */ + userId: string; + + /** + * @generated from oneof event.v1.StreamEventRequest.data + */ + data: { + /** + * @generated from field: event.v1.SDKCall sdkCall = 3; + */ + value: SDKCall; + case: "sdkCall"; + } | { + /** + * @generated from field: event.v1.AITokenUsage aiTokenUsage = 4; + */ + value: AITokenUsage; + case: "aiTokenUsage"; + } | { case: undefined; value?: undefined }; +}; + +/** + * Describes the message event.v1.StreamEventRequest. + * Use `create(StreamEventRequestSchema)` to create a new message. + */ +export const StreamEventRequestSchema: GenMessage = /*@__PURE__*/ + messageDesc(file_event_v1_event, 3); + +/** + * @generated from message event.v1.AITokenUsage + */ +export type AITokenUsage = Message<"event.v1.AITokenUsage"> & { + /** + * @generated from field: string model = 1; + */ + model: string; + + /** + * @generated from field: int32 inputTokens = 2; + */ + inputTokens: number; + + /** + * @generated from field: int32 outputTokens = 3; + */ + outputTokens: number; + + /** + * @generated from oneof event.v1.AITokenUsage.inputDebit + */ + inputDebit: { + /** + * @generated from field: float inputAmount = 4; + */ + value: number; + case: "inputAmount"; + } | { + /** + * @generated from field: string inputTag = 5; + */ + value: string; + case: "inputTag"; + } | { case: undefined; value?: undefined }; + + /** + * @generated from oneof event.v1.AITokenUsage.outputDebit + */ + outputDebit: { + /** + * @generated from field: float outputAmount = 6; + */ + value: number; + case: "outputAmount"; + } | { + /** + * @generated from field: string outputTag = 7; + */ + value: string; + case: "outputTag"; + } | { case: undefined; value?: undefined }; +}; + +/** + * Describes the message event.v1.AITokenUsage. + * Use `create(AITokenUsageSchema)` to create a new message. + */ +export const AITokenUsageSchema: GenMessage = /*@__PURE__*/ + messageDesc(file_event_v1_event, 4); + +/** + * @generated from message event.v1.StreamEventResponse + */ +export type StreamEventResponse = Message<"event.v1.StreamEventResponse"> & { + /** + * @generated from field: int32 eventsProcessed = 1; + */ + eventsProcessed: number; + + /** + * @generated from field: string message = 2; + */ + message: string; +}; + +/** + * Describes the message event.v1.StreamEventResponse. + * Use `create(StreamEventResponseSchema)` to create a new message. + */ +export const StreamEventResponseSchema: GenMessage = /*@__PURE__*/ + messageDesc(file_event_v1_event, 5); + /** * @generated from enum event.v1.EventType */ @@ -109,6 +231,11 @@ export enum EventType { * @generated from enum value: SDK_CALL = 1; */ SDK_CALL = 1, + + /** + * @generated from enum value: AI_TOKEN_USAGE = 2; + */ + AI_TOKEN_USAGE = 2, } /** @@ -157,6 +284,16 @@ export const EventService: GenService<{ input: typeof RegisterEventRequestSchema; output: typeof RegisterEventResponseSchema; }, + /** + * StreamEvents streams events from client to server (e.g., AI token usage) + * + * @generated from rpc event.v1.EventService.StreamEvents + */ + streamEvents: { + methodKind: "client_streaming"; + input: typeof StreamEventRequestSchema; + output: typeof StreamEventResponseSchema; + }, }> = /*@__PURE__*/ serviceDesc(file_event_v1_event, 0); diff --git a/src/interface/event/Event.ts b/src/interface/event/Event.ts index 6d42d8f..603e582 100644 --- a/src/interface/event/Event.ts +++ b/src/interface/event/Event.ts @@ -1,12 +1,19 @@ import { DateTime } from "luxon"; -import { type EventSchemaType } from "../../zod/event"; import { type UserId } from "../../config/identifiers"; export type SDKCallEventData = { - sdkCallType: EventSchemaType["data"]["sdkCallType"]; + sdkCallType: "RAW" | "MIDDLEWARE_CALL"; debitAmount: number; }; +export type AITokenUsageEventData = { + model: string; + inputTokens: number; + outputTokens: number; + inputDebitAmount: number; + outputDebitAmount: number; +}; + export type AddKeyEventData = { name: string; key: string; @@ -24,8 +31,9 @@ export type RequestSDKCallEventData = null; /** * Mapping of event types to their data structures */ -type EventDataMap = { +export type EventDataMap = { SDK_CALL: SDKCallEventData; + AI_TOKEN_USAGE: AITokenUsageEventData; ADD_KEY: AddKeyEventData; PAYMENT: PaymentEventData; REQUEST_PAYMENT: RequestPaymentEventData; @@ -45,6 +53,7 @@ export type BaseEventMetadata = { type EventMetadataMap = { ADD_KEY: BaseEventMetadata<"ADD_KEY">; SDK_CALL: BaseEventMetadata<"SDK_CALL"> & { userId: UserId }; + AI_TOKEN_USAGE: BaseEventMetadata<"AI_TOKEN_USAGE"> & { userId: UserId }; PAYMENT: BaseEventMetadata<"PAYMENT"> & { userId: UserId }; REQUEST_PAYMENT: BaseEventMetadata<"REQUEST_PAYMENT"> & { userId: UserId }; REQUEST_SDK_CALL: BaseEventMetadata<"REQUEST_SDK_CALL"> & { userId: UserId }; @@ -80,6 +89,13 @@ export interface SDKCallEventType extends EventType<"SDK_CALL"> { readonly userId: UserId; } +/** + * AI Token Usage Event + */ +export interface AITokenUsageEventType extends EventType<"AI_TOKEN_USAGE"> { + readonly userId: UserId; +} + /** * Add Key Event */ diff --git a/src/routes/gRPC/events/registerEvent.ts b/src/routes/gRPC/events/registerEvent.ts index 04e76e9..102c568 100644 --- a/src/routes/gRPC/events/registerEvent.ts +++ b/src/routes/gRPC/events/registerEvent.ts @@ -4,16 +4,16 @@ import type { } from "../../../gen/event/v1/event_pb"; import { RegisterEventResponseSchema } from "../../../gen/event/v1/event_pb"; import { create } from "@bufbuild/protobuf"; -import { eventSchema } from "../../../zod/event"; -import { type EventType } from "../../../interface/event/Event"; -import { SDKCall } from "../../../events/RawEvents/SDKCall"; import { EventError } from "../../../errors/event"; -import { AuthError } from "../../../errors/auth"; -import { ZodError } from "zod"; -import { StorageAdapterFactory } from "../../../factory"; import type { HandlerContext } from "@connectrpc/connect"; import { apiKeyContextKey } from "../../../context/auth"; import { logger } from "../../../errors/logger"; +import { + extractApiKeyFromContext, + validateAndParseEvent, + createEventInstance, + storeEvent, +} from "../../../utils/eventHelpers"; const OPERATION = "RegisterEvent"; @@ -22,72 +22,36 @@ export async function registerEvent( context: HandlerContext, ): Promise { try { - // Get API key ID from context (set by auth interceptor) - const apiKeyId = context.values.get(apiKeyContextKey); - if (!apiKeyId) { - throw AuthError.invalidAPIKey("API key ID not found in context"); - } + // Extract API key ID from context + const apiKeyId = extractApiKeyFromContext(context); - logger.logOperationInfo(OPERATION, "authenticated", "Request authenticated", { - apiKeyId, - }); + logger.logOperationInfo( + OPERATION, + "authenticated", + "Request authenticated", + { + apiKeyId, + }, + ); - // Validate the incoming request against the schema - let eventSkeleton; - try { - eventSkeleton = await eventSchema.parseAsync(req); - } catch (error) { - if (error instanceof EventError) { - throw error; - } - if (error instanceof ZodError) { - const issues = error.issues - .map((issue) => `${issue.path.join(".")}: ${issue.message}`) - .join("; "); - throw EventError.validationFailed(issues, error); - } - throw EventError.validationFailed( - "Unknown validation error", - error as Error, - ); - } + // Validate and parse the incoming event + const eventSkeleton = await validateAndParseEvent(req); - // Create the appropriate event based on type - let event: EventType; + // Create the appropriate event instance + const event = createEventInstance(eventSkeleton); - try { - switch (eventSkeleton.type) { - case "SDK_CALL": - event = new SDKCall(eventSkeleton.userId, eventSkeleton.data); - break; - default: - throw EventError.unsupportedEventType(eventSkeleton.type); - } - } catch (error) { - if (error instanceof EventError) { - throw error; - } - throw EventError.unknown(error as Error); - } + // Store the event + await storeEvent(event, apiKeyId); - // Get the storage adapter and persist the event - try { - const adapter = await StorageAdapterFactory.getStorageAdapter( - event, + logger.logOperationInfo( + OPERATION, + "completed", + "Event stored successfully", + { apiKeyId, - ); - await adapter.add(); - } catch (error) { - throw EventError.serializationError( - "Failed to store event", - error as Error, - ); - } - - logger.logOperationInfo(OPERATION, "completed", "Event stored successfully", { - apiKeyId, - userId: eventSkeleton.userId, - }); + userId: eventSkeleton.userId, + }, + ); return create(RegisterEventResponseSchema, { random: "Event stored successfully", diff --git a/src/routes/gRPC/events/streamEvents.ts b/src/routes/gRPC/events/streamEvents.ts new file mode 100644 index 0000000..a0d7e80 --- /dev/null +++ b/src/routes/gRPC/events/streamEvents.ts @@ -0,0 +1,142 @@ +import type { + StreamEventRequest, + StreamEventResponse, +} from "../../../gen/event/v1/event_pb"; +import type { EventType, EventDataMap } from "../../../interface/event/Event"; +import { StreamEventResponseSchema } from "../../../gen/event/v1/event_pb"; +import { create } from "@bufbuild/protobuf"; +import { EventError } from "../../../errors/event"; +import type { HandlerContext } from "@connectrpc/connect"; +import { apiKeyContextKey } from "../../../context/auth"; +import { logger } from "../../../errors/logger"; +import { + extractApiKeyFromContext, + validateAndParseEvent, + createEventInstance, + storeEvent, +} from "../../../utils/eventHelpers"; + +const OPERATION = "StreamEvents"; + +export async function streamEvents( + requestStream: AsyncIterable, + context: HandlerContext, +): Promise { + let eventsProcessed = 0; + const events: Array<{ + event: EventType; + userId: string; + }> = []; + + try { + // Extract API key ID from context + const apiKeyId = extractApiKeyFromContext(context); + + logger.logOperationInfo( + OPERATION, + "authenticated", + "Stream authenticated", + { + apiKeyId, + }, + ); + + // Collect all events from the stream + for await (const req of requestStream) { + try { + // Validate and parse the incoming event + const eventSkeleton = await validateAndParseEvent(req); + + // Create the appropriate event instance + const event = createEventInstance(eventSkeleton); + + // Add to events array instead of storing immediately + events.push({ event, userId: eventSkeleton.userId }); + + logger.logOperationInfo( + OPERATION, + "event_validated", + "Event validated and queued", + { + apiKeyId, + userId: eventSkeleton.userId, + eventNumber: events.length, + }, + ); + } catch (error) { + // Log error but continue processing other events + logger.logOperationError( + OPERATION, + "event_validation_failed", + error instanceof EventError ? error.type : "UNKNOWN", + "Failed to validate event in stream", + error instanceof Error ? error : undefined, + { apiKeyId, eventNumber: events.length + 1 }, + ); + + // Continue collecting remaining events + } + } + + // Store all events in one go after stream completes + if (events.length > 0) { + logger.logOperationInfo( + OPERATION, + "storing_batch", + `Storing ${events.length} events in batch`, + { apiKeyId, totalEvents: events.length }, + ); + + for (const { event, userId } of events) { + try { + await storeEvent(event, apiKeyId); + eventsProcessed++; + } catch (error) { + logger.logOperationError( + OPERATION, + "event_storage_failed", + error instanceof EventError ? error.type : "UNKNOWN", + "Failed to store event in batch", + error instanceof Error ? error : undefined, + { apiKeyId, userId, eventNumber: eventsProcessed + 1 }, + ); + } + } + } + + logger.logOperationInfo( + OPERATION, + "completed", + "Stream processing completed", + { + apiKeyId: context.values.get(apiKeyContextKey), + eventsProcessed, + }, + ); + + return create(StreamEventResponseSchema, { + eventsProcessed, + message: `Successfully processed ${eventsProcessed} events`, + }); + } catch (error) { + logger.logOperationError( + OPERATION, + "failed", + error instanceof EventError ? error.type : "UNKNOWN", + "StreamEvents handler failed", + error instanceof Error ? error : undefined, + { + apiKeyId: context.values.get(apiKeyContextKey), + eventsProcessed, + }, + ); + + // Re-throw EventError as-is + if (error instanceof EventError) { + throw error; + } + + // Wrap unexpected errors + throw EventError.unknown(error as Error); + } +} diff --git a/src/server.ts b/src/server.ts index ce25764..85be559 100644 --- a/src/server.ts +++ b/src/server.ts @@ -7,6 +7,7 @@ import { AuthService } from "./gen/auth/v1/auth_pb.ts"; import { PaymentService } from "./gen/payment/v1/payment_pb.ts"; import { authInterceptor } from "./interceptors/auth.ts"; import { registerEvent } from "./routes/gRPC/events/registerEvent.ts"; +import { streamEvents } from "./routes/gRPC/events/streamEvents.ts"; import { createAPIKey } from "./routes/gRPC/auth/createAPIKey.ts"; import { createCheckoutLink } from "./routes/gRPC/payment/createCheckoutLink.ts"; import { getPostgresDB } from "./storage/db/postgres/db.ts"; @@ -31,6 +32,7 @@ const grpcHandler = connectNodeAdapter({ // EventService implementation router.service(EventService, { registerEvent, + streamEvents, }); // AuthService implementation diff --git a/src/storage/adapter/postgres/handlers/addAiTokenUsage.ts b/src/storage/adapter/postgres/handlers/addAiTokenUsage.ts new file mode 100644 index 0000000..318c8f7 --- /dev/null +++ b/src/storage/adapter/postgres/handlers/addAiTokenUsage.ts @@ -0,0 +1,210 @@ +import { getPostgresDB } from "../../../db/postgres/db"; +import { + usersTable, + eventsTable, + aiTokenUsageEventsTable, +} from "../../../db/postgres/schema"; +import { StorageError } from "../../../../errors/storage"; +import { type BaseEventMetadata } from "../../../../interface/event/Event"; +import { type UserId } from "../../../../config/identifiers"; +import { logger } from "../../../../errors/logger"; + +const OPERATION = "AddAiTokenUsage"; + +export async function handleAddAiTokenUsage( + event_data: BaseEventMetadata<"AI_TOKEN_USAGE"> & { + userId: UserId; + }, + apiKeyId: string, +): Promise<{ id: string } | void> { + const connectionObject = getPostgresDB(); + + try { + logger.logOperationInfo( + OPERATION, + "start", + "Processing AI_TOKEN_USAGE event", + { + userId: event_data.userId, + apiKeyId, + }, + ); + + // Validate input tokens is not negative + const inputTokens = event_data.data.inputTokens; + if (typeof inputTokens === "number" && inputTokens < 0) { + throw StorageError.insertFailed( + `Negative input tokens not allowed for AI token usage for user ${event_data.userId}`, + new Error(`inputTokens ${inputTokens} is negative`), + ); + } + + // Validate output tokens is not negative + const outputTokens = event_data.data.outputTokens; + if (typeof outputTokens === "number" && outputTokens < 0) { + throw StorageError.insertFailed( + `Negative output tokens not allowed for AI token usage for user ${event_data.userId}`, + new Error(`outputTokens ${outputTokens} is negative`), + ); + } + + // Validate input debit amount is not negative + const inputDebitAmount = event_data.data.inputDebitAmount; + if (typeof inputDebitAmount === "number" && inputDebitAmount < 0) { + throw StorageError.insertFailed( + `Negative input debit amount not allowed for AI token usage for user ${event_data.userId}`, + new Error(`inputDebitAmount ${inputDebitAmount} is negative`), + ); + } + + // Validate output debit amount is not negative + const outputDebitAmount = event_data.data.outputDebitAmount; + if (typeof outputDebitAmount === "number" && outputDebitAmount < 0) { + throw StorageError.insertFailed( + `Negative output debit amount not allowed for AI token usage for user ${event_data.userId}`, + new Error(`outputDebitAmount ${outputDebitAmount} is negative`), + ); + } + + await connectionObject.transaction(async (txn) => { + // Insert user if not exists + try { + await txn + .insert(usersTable) + .values({ + id: event_data.userId, + }) + .onConflictDoNothing(); + + logger.logOperationDebug( + OPERATION, + "user_ensured", + "User ensured in database", + { userId: event_data.userId }, + ); + } catch (e) { + if ( + e instanceof Error && + e.message.includes('Failed query: insert into "users" ("id")') + ) { + // User already exists, ignore the error + logger.logOperationDebug( + OPERATION, + "user_exists", + "User already exists, continuing", + { userId: event_data.userId }, + ); + } else { + throw StorageError.userInsertFailed( + event_data.userId, + e instanceof Error ? e : new Error(String(e)), + ); + } + } + + // Validate and prepare timestamp + let reported_timestamp; + try { + reported_timestamp = event_data.reported_timestamp.toISO(); + } catch (e) { + throw StorageError.invalidTimestamp( + "Failed to convert reported_timestamp to ISO format", + e instanceof Error ? e : new Error(String(e)), + ); + } + + if (!reported_timestamp || reported_timestamp.trim().length === 0) { + throw StorageError.invalidTimestamp( + "Timestamp is undefined or empty after conversion", + ); + } + + // Insert event + let eventID; + try { + [eventID] = await txn + .insert(eventsTable) + .values({ + reported_timestamp, + userId: event_data.userId, + api_keyId: apiKeyId, + }) + .returning({ id: eventsTable.id }); + } catch (e) { + throw StorageError.eventInsertFailed( + `Failed to insert event for user ${event_data.userId}`, + e instanceof Error ? e : new Error(String(e)), + ); + } + + if (!eventID) { + throw StorageError.emptyResult("Event insert returned no ID"); + } + + logger.logOperationInfo( + OPERATION, + "event_inserted", + "Event row inserted", + { eventId: eventID.id, userId: event_data.userId, apiKeyId }, + ); + + // Insert AI token usage event + try { + const aiData = event_data.data; + + await txn.insert(aiTokenUsageEventsTable).values({ + id: eventID.id, + model: aiData.model, + inputTokens: aiData.inputTokens, + outputTokens: aiData.outputTokens, + inputDebitAmount: aiData.inputDebitAmount, + outputDebitAmount: aiData.outputDebitAmount, + }); + + logger.logOperationInfo( + OPERATION, + "ai_token_usage_inserted", + "AI token usage event inserted successfully", + { + eventId: eventID.id, + model: aiData.model, + inputTokens: aiData.inputTokens, + outputTokens: aiData.outputTokens, + inputDebitAmount: aiData.inputDebitAmount, + outputDebitAmount: aiData.outputDebitAmount, + userId: event_data.userId, + }, + ); + } catch (e) { + throw StorageError.insertFailed( + `Failed to insert AI token usage event for event ID ${eventID.id}`, + e instanceof Error ? e : new Error(String(e)), + ); + } + + return { id: eventID }; + }); + + logger.logOperationInfo( + OPERATION, + "completed", + "AI_TOKEN_USAGE transaction completed successfully", + { userId: event_data.userId, apiKeyId }, + ); + } catch (e) { + // Use duck typing instead of instanceof to work with mocked modules + if ( + e && + typeof e === "object" && + "type" in e && + (e as any).name === "StorageError" + ) { + throw e; + } + + throw StorageError.transactionFailed( + "Transaction failed while storing AI_TOKEN_USAGE event", + e instanceof Error ? e : new Error(String(e)), + ); + } +} diff --git a/src/storage/adapter/postgres/handlers/index.ts b/src/storage/adapter/postgres/handlers/index.ts index 4863f22..705dd8e 100644 --- a/src/storage/adapter/postgres/handlers/index.ts +++ b/src/storage/adapter/postgres/handlers/index.ts @@ -3,3 +3,4 @@ export { handleAddKey } from "./addKey"; export { handleAddPayment } from "./addPayment"; export { handlePriceRequestPayment } from "./priceRequestPayment"; export { handlePriceRequestSdkCall } from "./priceRequestSdkCall"; +export { handleAddAiTokenUsage } from "./addAiTokenUsage"; diff --git a/src/storage/adapter/postgres/postgres.ts b/src/storage/adapter/postgres/postgres.ts index 3938681..890951d 100644 --- a/src/storage/adapter/postgres/postgres.ts +++ b/src/storage/adapter/postgres/postgres.ts @@ -8,6 +8,7 @@ import { handleAddPayment, handlePriceRequestPayment, handlePriceRequestSdkCall, + handleAddAiTokenUsage, } from "./handlers"; import { logger } from "../../../errors/logger"; @@ -68,6 +69,13 @@ export class PostgresAdapter implements StorageAdapterType { return await handleAddPayment(event_data, this.apiKeyId); } + case "AI_TOKEN_USAGE": { + if (!this.apiKeyId) { + throw StorageError.missingApiKeyId(); + } + return await handleAddAiTokenUsage(event_data, this.apiKeyId); + } + default: { throw StorageError.unknownEventType(event_data.type); } diff --git a/src/storage/db/postgres/schema.ts b/src/storage/db/postgres/schema.ts index 4b42b63..5b38b94 100644 --- a/src/storage/db/postgres/schema.ts +++ b/src/storage/db/postgres/schema.ts @@ -71,6 +71,10 @@ export const eventsRelation = relations(eventsTable, ({ one }) => ({ fields: [eventsTable.id], references: [paymentEventsTable.id], }), + aiTokenUsageEvent: one(aiTokenUsageEventsTable, { + fields: [eventsTable.id], + references: [aiTokenUsageEventsTable.id], + }), })); export const sdkCallEventsTable = pgTable("sdk_call_events", { @@ -113,3 +117,24 @@ export const tagsTable = pgTable("tags", { tag: text("key").notNull(), amount: integer("amount").notNull(), }); + +export const aiTokenUsageEventsTable = pgTable("ai_token_usage_events", { + id: uuid("id") + .references(() => eventsTable.id) + .primaryKey(), + model: text("model").notNull(), + inputTokens: integer("input_tokens").notNull(), + outputTokens: integer("output_tokens").notNull(), + inputDebitAmount: integer("input_debit_amount").notNull(), + outputDebitAmount: integer("output_debit_amount").notNull(), +}); + +export const aiTokenUsageEventsRelation = relations( + aiTokenUsageEventsTable, + ({ one }) => ({ + event: one(eventsTable, { + fields: [aiTokenUsageEventsTable.id], + references: [eventsTable.id], + }), + }), +); diff --git a/src/utils/eventHelpers.ts b/src/utils/eventHelpers.ts new file mode 100644 index 0000000..c0b4e77 --- /dev/null +++ b/src/utils/eventHelpers.ts @@ -0,0 +1,96 @@ +import type { HandlerContext } from "@connectrpc/connect"; +import { apiKeyContextKey } from "../context/auth"; +import { AuthError } from "../errors/auth"; +import { EventError } from "../errors/event"; +import { eventSchema } from "../zod/event"; +import { ZodError } from "zod"; +import type { EventType } from "../interface/event/Event"; +import { SDKCall } from "../events/RawEvents/SDKCall"; +import { AITokenUsage } from "../events/AIEvents/AITokenUsage"; +import { StorageAdapterFactory } from "../factory"; +import type { + RegisterEventRequest, + StreamEventRequest, +} from "../gen/event/v1/event_pb"; + +/** + * Extract API key ID from the request context + */ +export function extractApiKeyFromContext(context: HandlerContext): string { + const apiKeyId = context.values.get(apiKeyContextKey); + if (!apiKeyId) { + throw AuthError.invalidAPIKey("API key ID not found in context"); + } + return apiKeyId; +} + +/** + * Validate and parse the incoming event request + */ +export async function validateAndParseEvent( + req: RegisterEventRequest | StreamEventRequest, +) { + try { + return await eventSchema.parseAsync(req); + } catch (error) { + if (error instanceof EventError) { + throw error; + } + if (error instanceof ZodError) { + const issues = error.issues + .map((issue) => `${issue.path.join(".")}: ${issue.message}`) + .join("; "); + throw EventError.validationFailed(issues, error); + } + throw EventError.validationFailed( + "Unknown validation error", + error as Error, + ); + } +} + +/** + * Create the appropriate event instance based on the event skeleton + */ +export function createEventInstance(eventSkeleton: { + type: string; + userId: string; + data: any; +}): EventType { + try { + switch (eventSkeleton.type) { + case "SDK_CALL": + return new SDKCall(eventSkeleton.userId, eventSkeleton.data); + case "AI_TOKEN_USAGE": + return new AITokenUsage(eventSkeleton.userId, eventSkeleton.data); + default: + throw EventError.unsupportedEventType(eventSkeleton.type); + } + } catch (error) { + if (error instanceof EventError) { + throw error; + } + throw EventError.unknown(error as Error); + } +} + +/** + * Store the event using the appropriate storage adapter + */ +export async function storeEvent( + event: EventType, + apiKeyId: string, +): Promise { + try { + const adapter = await StorageAdapterFactory.getStorageAdapter( + event, + apiKeyId, + ); + await adapter.add(); + } catch (error) { + throw EventError.serializationError( + "Failed to store event", + error as Error, + ); + } +} diff --git a/src/zod/event.ts b/src/zod/event.ts index d16a94a..88f56df 100644 --- a/src/zod/event.ts +++ b/src/zod/event.ts @@ -71,5 +71,110 @@ const SDKCallEvent = BaseEvent.extend({ .transform((obj) => obj.value), }); -export const eventSchema = z.discriminatedUnion("type", [SDKCallEvent]); +const AITokenUsageEvent = BaseEvent.extend({ + type: z + .literal(2) + .transform(() => "AI_TOKEN_USAGE") as z.ZodType<"AI_TOKEN_USAGE">, + data: z + .object({ + case: z.literal("aiTokenUsage"), + value: z + .object({ + model: z.string().min(1), + inputTokens: z.number().int().min(0), + outputTokens: z.number().int().min(0), + inputDebit: z.union([ + z.object({ + case: z.literal("inputAmount"), + value: z.number().min(0), + }), + z.object({ + case: z.literal("inputTag"), + value: z.string(), + }), + ]), + outputDebit: z.union([ + z.object({ + case: z.literal("outputAmount"), + value: z.number().min(0), + }), + z.object({ + case: z.literal("outputTag"), + value: z.string(), + }), + ]), + }) + .transform(async (v) => { + const db = getPostgresDB(); + + // Process input debit + let inputDebitAmount: number; + if (v.inputDebit.case === "inputTag") { + try { + const [tagRow] = await db + .select() + .from(tagsTable) + .where(eq(tagsTable.tag, v.inputDebit.value)) + .limit(1); + + if (!tagRow) { + throw EventError.validationFailed( + `Input tag not found: ${v.inputDebit.value}`, + ); + } + + inputDebitAmount = tagRow.amount; + } catch (e) { + if (e instanceof EventError) { + throw e; + } + throw EventError.unknown(e as Error); + } + } else { + inputDebitAmount = Math.floor(v.inputDebit.value * 100); + } + + // Process output debit + let outputDebitAmount: number; + if (v.outputDebit.case === "outputTag") { + try { + const [tagRow] = await db + .select() + .from(tagsTable) + .where(eq(tagsTable.tag, v.outputDebit.value)) + .limit(1); + + if (!tagRow) { + throw EventError.validationFailed( + `Output tag not found: ${v.outputDebit.value}`, + ); + } + + outputDebitAmount = tagRow.amount; + } catch (e) { + if (e instanceof EventError) { + throw e; + } + throw EventError.unknown(e as Error); + } + } else { + outputDebitAmount = Math.floor(v.outputDebit.value * 100); + } + + return { + model: v.model, + inputTokens: v.inputTokens, + outputTokens: v.outputTokens, + inputDebitAmount, + outputDebitAmount, + }; + }), + }) + .transform((obj) => obj.value), +}); + +export const eventSchema = z.discriminatedUnion("type", [ + SDKCallEvent, + AITokenUsageEvent, +]); export type EventSchemaType = z.infer; From 9b9c5eb9cbc7fadf678a2e31ec98406e921b53ab Mon Sep 17 00:00:00 2001 From: Jayadeep Bejoy Date: Tue, 16 Dec 2025 23:43:55 +0400 Subject: [PATCH 2/4] perf(ai_batching) Signed-off-by: Jayadeep Bejoy --- src/routes/gRPC/events/streamEvents.ts | 37 +-- .../postgres/handlers/addAiTokenUsage.ts | 313 ++++++++++++------ src/storage/adapter/postgres/postgres.ts | 2 +- src/utils/eventHelpers.ts | 65 ++++ 4 files changed, 287 insertions(+), 130 deletions(-) diff --git a/src/routes/gRPC/events/streamEvents.ts b/src/routes/gRPC/events/streamEvents.ts index a0d7e80..b83fc97 100644 --- a/src/routes/gRPC/events/streamEvents.ts +++ b/src/routes/gRPC/events/streamEvents.ts @@ -13,7 +13,7 @@ import { extractApiKeyFromContext, validateAndParseEvent, createEventInstance, - storeEvent, + storeEventsBatch, } from "../../../utils/eventHelpers"; const OPERATION = "StreamEvents"; @@ -23,10 +23,7 @@ export async function streamEvents( context: HandlerContext, ): Promise { let eventsProcessed = 0; - const events: Array<{ - event: EventType; - userId: string; - }> = []; + const events: Array> = []; try { // Extract API key ID from context @@ -51,7 +48,7 @@ export async function streamEvents( const event = createEventInstance(eventSkeleton); // Add to events array instead of storing immediately - events.push({ event, userId: eventSkeleton.userId }); + events.push(event); logger.logOperationInfo( OPERATION, @@ -78,7 +75,7 @@ export async function streamEvents( } } - // Store all events in one go after stream completes + // Store all events in one batch after stream completes if (events.length > 0) { logger.logOperationInfo( OPERATION, @@ -87,20 +84,18 @@ export async function streamEvents( { apiKeyId, totalEvents: events.length }, ); - for (const { event, userId } of events) { - try { - await storeEvent(event, apiKeyId); - eventsProcessed++; - } catch (error) { - logger.logOperationError( - OPERATION, - "event_storage_failed", - error instanceof EventError ? error.type : "UNKNOWN", - "Failed to store event in batch", - error instanceof Error ? error : undefined, - { apiKeyId, userId, eventNumber: eventsProcessed + 1 }, - ); - } + try { + await storeEventsBatch(events, apiKeyId); + eventsProcessed = events.length; + } catch (error) { + logger.logOperationError( + OPERATION, + "batch_storage_failed", + error instanceof EventError ? error.type : "UNKNOWN", + "Failed to store events in batch", + error instanceof Error ? error : undefined, + { apiKeyId, totalEvents: events.length }, + ); } } diff --git a/src/storage/adapter/postgres/handlers/addAiTokenUsage.ts b/src/storage/adapter/postgres/handlers/addAiTokenUsage.ts index 318c8f7..aa10290 100644 --- a/src/storage/adapter/postgres/handlers/addAiTokenUsage.ts +++ b/src/storage/adapter/postgres/handlers/addAiTokenUsage.ts @@ -11,185 +11,282 @@ import { logger } from "../../../../errors/logger"; const OPERATION = "AddAiTokenUsage"; +type AggregatedEvent = { + userId: UserId; + model: string; + inputTokens: number; + outputTokens: number; + inputDebitAmount: number; + outputDebitAmount: number; + reported_timestamp: string; +}; + export async function handleAddAiTokenUsage( - event_data: BaseEventMetadata<"AI_TOKEN_USAGE"> & { - userId: UserId; - }, + events: Array< + BaseEventMetadata<"AI_TOKEN_USAGE"> & { + userId: UserId; + } + >, apiKeyId: string, ): Promise<{ id: string } | void> { const connectionObject = getPostgresDB(); + if (events.length === 0) { + logger.logOperationInfo(OPERATION, "skipped", "No events to process", { + apiKeyId, + }); + return; + } + try { logger.logOperationInfo( OPERATION, "start", - "Processing AI_TOKEN_USAGE event", + `Processing ${events.length} AI_TOKEN_USAGE event(s)`, { - userId: event_data.userId, + eventCount: events.length, apiKeyId, }, ); - // Validate input tokens is not negative - const inputTokens = event_data.data.inputTokens; - if (typeof inputTokens === "number" && inputTokens < 0) { - throw StorageError.insertFailed( - `Negative input tokens not allowed for AI token usage for user ${event_data.userId}`, - new Error(`inputTokens ${inputTokens} is negative`), - ); - } + // Validate all events before processing + for (const event_data of events) { + // Validate input tokens is not negative + const inputTokens = event_data.data.inputTokens; + if (typeof inputTokens === "number" && inputTokens < 0) { + throw StorageError.insertFailed( + `Negative input tokens not allowed for AI token usage for user ${event_data.userId}`, + new Error(`inputTokens ${inputTokens} is negative`), + ); + } - // Validate output tokens is not negative - const outputTokens = event_data.data.outputTokens; - if (typeof outputTokens === "number" && outputTokens < 0) { - throw StorageError.insertFailed( - `Negative output tokens not allowed for AI token usage for user ${event_data.userId}`, - new Error(`outputTokens ${outputTokens} is negative`), - ); - } + // Validate output tokens is not negative + const outputTokens = event_data.data.outputTokens; + if (typeof outputTokens === "number" && outputTokens < 0) { + throw StorageError.insertFailed( + `Negative output tokens not allowed for AI token usage for user ${event_data.userId}`, + new Error(`outputTokens ${outputTokens} is negative`), + ); + } - // Validate input debit amount is not negative - const inputDebitAmount = event_data.data.inputDebitAmount; - if (typeof inputDebitAmount === "number" && inputDebitAmount < 0) { - throw StorageError.insertFailed( - `Negative input debit amount not allowed for AI token usage for user ${event_data.userId}`, - new Error(`inputDebitAmount ${inputDebitAmount} is negative`), - ); + // Validate input debit amount is not negative + const inputDebitAmount = event_data.data.inputDebitAmount; + if (typeof inputDebitAmount === "number" && inputDebitAmount < 0) { + throw StorageError.insertFailed( + `Negative input debit amount not allowed for AI token usage for user ${event_data.userId}`, + new Error(`inputDebitAmount ${inputDebitAmount} is negative`), + ); + } + + // Validate output debit amount is not negative + const outputDebitAmount = event_data.data.outputDebitAmount; + if (typeof outputDebitAmount === "number" && outputDebitAmount < 0) { + throw StorageError.insertFailed( + `Negative output debit amount not allowed for AI token usage for user ${event_data.userId}`, + new Error(`outputDebitAmount ${outputDebitAmount} is negative`), + ); + } } - // Validate output debit amount is not negative - const outputDebitAmount = event_data.data.outputDebitAmount; - if (typeof outputDebitAmount === "number" && outputDebitAmount < 0) { - throw StorageError.insertFailed( - `Negative output debit amount not allowed for AI token usage for user ${event_data.userId}`, - new Error(`outputDebitAmount ${outputDebitAmount} is negative`), - ); + // Aggregate events by userId and model + const aggregationMap = new Map(); + + for (const event_data of events) { + let reported_timestamp; + try { + reported_timestamp = event_data.reported_timestamp.toISO(); + } catch (e) { + throw StorageError.invalidTimestamp( + "Failed to convert reported_timestamp to ISO format", + e instanceof Error ? e : new Error(String(e)), + ); + } + + if (!reported_timestamp || reported_timestamp.trim().length === 0) { + throw StorageError.invalidTimestamp( + "Timestamp is undefined or empty after conversion", + ); + } + + const key = `${event_data.userId}:${event_data.data.model}`; + const existing = aggregationMap.get(key); + + if (existing) { + // Aggregate with existing entry + existing.inputTokens += event_data.data.inputTokens; + existing.outputTokens += event_data.data.outputTokens; + existing.inputDebitAmount += event_data.data.inputDebitAmount; + existing.outputDebitAmount += event_data.data.outputDebitAmount; + // Use the latest timestamp + if (reported_timestamp > existing.reported_timestamp) { + existing.reported_timestamp = reported_timestamp; + } + } else { + // Create new aggregated entry + aggregationMap.set(key, { + userId: event_data.userId, + model: event_data.data.model, + inputTokens: event_data.data.inputTokens, + outputTokens: event_data.data.outputTokens, + inputDebitAmount: event_data.data.inputDebitAmount, + outputDebitAmount: event_data.data.outputDebitAmount, + reported_timestamp, + }); + } } + const aggregatedEvents = Array.from(aggregationMap.values()); + + logger.logOperationInfo( + OPERATION, + "aggregated", + `Aggregated ${events.length} event(s) into ${aggregatedEvents.length} unique (userId, model) combination(s)`, + { + originalCount: events.length, + aggregatedCount: aggregatedEvents.length, + apiKeyId, + }, + ); + await connectionObject.transaction(async (txn) => { - // Insert user if not exists + // Collect unique user IDs + const uniqueUserIds = Array.from( + new Set(aggregatedEvents.map((event) => event.userId)), + ); + + // Batch insert users if not exists try { - await txn - .insert(usersTable) - .values({ - id: event_data.userId, - }) - .onConflictDoNothing(); - - logger.logOperationDebug( - OPERATION, - "user_ensured", - "User ensured in database", - { userId: event_data.userId }, - ); + if (uniqueUserIds.length > 0) { + await txn + .insert(usersTable) + .values(uniqueUserIds.map((id) => ({ id }))) + .onConflictDoNothing(); + + logger.logOperationDebug( + OPERATION, + "users_ensured", + "Users ensured in database", + { userCount: uniqueUserIds.length }, + ); + } } catch (e) { if ( e instanceof Error && e.message.includes('Failed query: insert into "users" ("id")') ) { - // User already exists, ignore the error + // Users already exist, ignore the error logger.logOperationDebug( OPERATION, - "user_exists", - "User already exists, continuing", - { userId: event_data.userId }, + "users_exist", + "Users already exist, continuing", + { userCount: uniqueUserIds.length }, ); } else { throw StorageError.userInsertFailed( - event_data.userId, + uniqueUserIds.join(", "), e instanceof Error ? e : new Error(String(e)), ); } } - // Validate and prepare timestamp - let reported_timestamp; - try { - reported_timestamp = event_data.reported_timestamp.toISO(); - } catch (e) { - throw StorageError.invalidTimestamp( - "Failed to convert reported_timestamp to ISO format", - e instanceof Error ? e : new Error(String(e)), - ); - } - - if (!reported_timestamp || reported_timestamp.trim().length === 0) { - throw StorageError.invalidTimestamp( - "Timestamp is undefined or empty after conversion", - ); - } + // Prepare event values for batch insert + const eventValues = aggregatedEvents.map((aggEvent) => ({ + reported_timestamp: aggEvent.reported_timestamp, + userId: aggEvent.userId, + api_keyId: apiKeyId, + })); - // Insert event - let eventID; + // Batch insert events + let eventIDs; try { - [eventID] = await txn + eventIDs = await txn .insert(eventsTable) - .values({ - reported_timestamp, - userId: event_data.userId, - api_keyId: apiKeyId, - }) + .values(eventValues) .returning({ id: eventsTable.id }); } catch (e) { throw StorageError.eventInsertFailed( - `Failed to insert event for user ${event_data.userId}`, + `Failed to batch insert ${aggregatedEvents.length} aggregated event(s)`, e instanceof Error ? e : new Error(String(e)), ); } - if (!eventID) { - throw StorageError.emptyResult("Event insert returned no ID"); + if (!eventIDs || eventIDs.length === 0) { + throw StorageError.emptyResult("Event insert returned no IDs"); + } + + if (eventIDs.length !== aggregatedEvents.length) { + throw StorageError.insertFailed( + `Expected ${aggregatedEvents.length} event IDs but got ${eventIDs.length}`, + new Error("Event ID count mismatch"), + ); } logger.logOperationInfo( OPERATION, - "event_inserted", - "Event row inserted", - { eventId: eventID.id, userId: event_data.userId, apiKeyId }, + "events_inserted", + `${eventIDs.length} event row(s) inserted`, + { eventCount: eventIDs.length, apiKeyId }, ); - // Insert AI token usage event + // Prepare AI token usage values for batch insert + const aiTokenUsageValues = aggregatedEvents.map((aggEvent, index) => { + const eventId = eventIDs[index]; + if (!eventId) { + throw StorageError.insertFailed( + `Missing event ID at index ${index}`, + new Error("Event ID is undefined"), + ); + } + return { + id: eventId.id, + model: aggEvent.model, + inputTokens: aggEvent.inputTokens, + outputTokens: aggEvent.outputTokens, + inputDebitAmount: aggEvent.inputDebitAmount, + outputDebitAmount: aggEvent.outputDebitAmount, + }; + }); + + // Batch insert AI token usage events try { - const aiData = event_data.data; - - await txn.insert(aiTokenUsageEventsTable).values({ - id: eventID.id, - model: aiData.model, - inputTokens: aiData.inputTokens, - outputTokens: aiData.outputTokens, - inputDebitAmount: aiData.inputDebitAmount, - outputDebitAmount: aiData.outputDebitAmount, - }); + await txn.insert(aiTokenUsageEventsTable).values(aiTokenUsageValues); logger.logOperationInfo( OPERATION, "ai_token_usage_inserted", - "AI token usage event inserted successfully", + `${aiTokenUsageValues.length} AI token usage event(s) inserted successfully`, { - eventId: eventID.id, - model: aiData.model, - inputTokens: aiData.inputTokens, - outputTokens: aiData.outputTokens, - inputDebitAmount: aiData.inputDebitAmount, - outputDebitAmount: aiData.outputDebitAmount, - userId: event_data.userId, + eventCount: aiTokenUsageValues.length, + apiKeyId, }, ); } catch (e) { throw StorageError.insertFailed( - `Failed to insert AI token usage event for event ID ${eventID.id}`, + `Failed to batch insert AI token usage events`, e instanceof Error ? e : new Error(String(e)), ); } - return { id: eventID }; + const firstEvent = eventIDs[0]; + if (!firstEvent || !firstEvent.id) { + throw StorageError.insertFailed( + "Missing or invalid ID for the first inserted event", + new Error(`Invalid first event ID: ${JSON.stringify(firstEvent)}`), + ); + } + + return { id: firstEvent.id }; }); logger.logOperationInfo( OPERATION, "completed", - "AI_TOKEN_USAGE transaction completed successfully", - { userId: event_data.userId, apiKeyId }, + `AI_TOKEN_USAGE batch transaction completed successfully - processed ${events.length} event(s), inserted ${aggregatedEvents.length} row(s)`, + { + originalCount: events.length, + aggregatedCount: aggregatedEvents.length, + apiKeyId, + }, ); } catch (e) { // Use duck typing instead of instanceof to work with mocked modules @@ -203,7 +300,7 @@ export async function handleAddAiTokenUsage( } throw StorageError.transactionFailed( - "Transaction failed while storing AI_TOKEN_USAGE event", + `Transaction failed while storing ${events.length} AI_TOKEN_USAGE event(s)`, e instanceof Error ? e : new Error(String(e)), ); } diff --git a/src/storage/adapter/postgres/postgres.ts b/src/storage/adapter/postgres/postgres.ts index 890951d..d6e03be 100644 --- a/src/storage/adapter/postgres/postgres.ts +++ b/src/storage/adapter/postgres/postgres.ts @@ -73,7 +73,7 @@ export class PostgresAdapter implements StorageAdapterType { if (!this.apiKeyId) { throw StorageError.missingApiKeyId(); } - return await handleAddAiTokenUsage(event_data, this.apiKeyId); + return await handleAddAiTokenUsage([event_data], this.apiKeyId); } default: { diff --git a/src/utils/eventHelpers.ts b/src/utils/eventHelpers.ts index c0b4e77..887da9b 100644 --- a/src/utils/eventHelpers.ts +++ b/src/utils/eventHelpers.ts @@ -8,6 +8,8 @@ import type { EventType } from "../interface/event/Event"; import { SDKCall } from "../events/RawEvents/SDKCall"; import { AITokenUsage } from "../events/AIEvents/AITokenUsage"; import { StorageAdapterFactory } from "../factory"; +import { handleAddAiTokenUsage } from "../storage/adapter/postgres/handlers"; +import { StorageError } from "../errors/storage"; import type { RegisterEventRequest, StreamEventRequest, @@ -94,3 +96,66 @@ export async function storeEvent( ); } } + +/** + * Store multiple events in a batch - groups by type and uses batch operations when possible + */ +export async function storeEventsBatch( + events: EventType[], + apiKeyId: string, +): Promise { + if (events.length === 0) { + return; + } + + // Group events by type + const eventsByType = new Map(); + for (const event of events) { + const type = event.type; + if (!eventsByType.has(type)) { + eventsByType.set(type, []); + } + eventsByType.get(type)!.push(event); + } + + // Process each type + for (const [type, typeEvents] of eventsByType) { + try { + if (type === "AI_TOKEN_USAGE") { + // Batch process AI_TOKEN_USAGE events + const serializedEvents: Array< + import("../interface/event/Event").BaseEventMetadata<"AI_TOKEN_USAGE"> & { + userId: string; + } + > = []; + + for (const event of typeEvents) { + const { SQL } = event.serialize(); + if (!SQL) { + throw StorageError.serializationFailed( + "Event serialization returned null or undefined", + ); + } + if (SQL.type !== "AI_TOKEN_USAGE") { + throw StorageError.serializationFailed( + `Expected AI_TOKEN_USAGE but got ${SQL.type}`, + ); + } + serializedEvents.push(SQL as any); + } + + await handleAddAiTokenUsage(serializedEvents, apiKeyId); + } else { + // For other event types, use individual storage + for (const event of typeEvents) { + await storeEvent(event, apiKeyId); + } + } + } catch (error) { + throw EventError.serializationError( + `Failed to store ${type} events`, + error as Error, + ); + } + } +} From c76688f75968c3920552527df49e9bd6535568fd Mon Sep 17 00:00:00 2001 From: Jayadeep Bejoy Date: Tue, 16 Dec 2025 23:44:21 +0400 Subject: [PATCH 3/4] test(ai_stream) Signed-off-by: Jayadeep Bejoy --- .../storage/postgres/addAiTokenUsage.test.ts | 584 ++++++++++++++++++ 1 file changed, 584 insertions(+) create mode 100644 src/__tests__/unit/storage/postgres/addAiTokenUsage.test.ts diff --git a/src/__tests__/unit/storage/postgres/addAiTokenUsage.test.ts b/src/__tests__/unit/storage/postgres/addAiTokenUsage.test.ts new file mode 100644 index 0000000..67309b6 --- /dev/null +++ b/src/__tests__/unit/storage/postgres/addAiTokenUsage.test.ts @@ -0,0 +1,584 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; +import { handleAddAiTokenUsage } from "../../../../storage/adapter/postgres/handlers/addAiTokenUsage"; +import * as dbModule from "../../../../storage/db/postgres/db"; +import { DateTime } from "luxon"; + +describe("handleAddAiTokenUsage - Aggregation and Batch Insert", () => { + let mockTransaction: any; + let mockDb: any; + + beforeEach(() => { + mockTransaction = { + insert: vi.fn().mockReturnThis(), + values: vi.fn().mockReturnThis(), + returning: vi.fn(), + onConflictDoNothing: vi.fn().mockReturnThis(), + }; + + mockDb = { + transaction: vi.fn(async (callback) => { + return await callback(mockTransaction); + }), + }; + + vi.spyOn(dbModule, "getPostgresDB").mockReturnValue(mockDb as any); + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + describe("aggregation logic", () => { + it("aggregates multiple events for same user and model into one row", async () => { + const events = [ + { + type: "AI_TOKEN_USAGE" as const, + userId: "user-1", + reported_timestamp: DateTime.now(), + data: { + model: "gpt-4", + inputTokens: 100, + outputTokens: 50, + inputDebitAmount: 10, + outputDebitAmount: 5, + }, + }, + { + type: "AI_TOKEN_USAGE" as const, + userId: "user-1", + reported_timestamp: DateTime.now(), + data: { + model: "gpt-4", + inputTokens: 200, + outputTokens: 100, + inputDebitAmount: 20, + outputDebitAmount: 10, + }, + }, + { + type: "AI_TOKEN_USAGE" as const, + userId: "user-1", + reported_timestamp: DateTime.now(), + data: { + model: "gpt-4", + inputTokens: 50, + outputTokens: 25, + inputDebitAmount: 5, + outputDebitAmount: 2, + }, + }, + ]; + + mockTransaction.returning.mockResolvedValueOnce([{ id: "event-1" }]); + + await handleAddAiTokenUsage(events, "api-key-123"); + + // Should insert only 1 event (aggregated) + const eventInsertCall = mockTransaction.values.mock.calls[1]; + expect(eventInsertCall[0]).toHaveLength(1); + expect(eventInsertCall[0][0].userId).toBe("user-1"); + + // Should insert only 1 AI token usage record with aggregated values + const aiTokenUsageInsertCall = mockTransaction.values.mock.calls[2]; + expect(aiTokenUsageInsertCall[0]).toHaveLength(1); + expect(aiTokenUsageInsertCall[0][0]).toEqual({ + id: "event-1", + model: "gpt-4", + inputTokens: 350, // 100 + 200 + 50 + outputTokens: 175, // 50 + 100 + 25 + inputDebitAmount: 35, // 10 + 20 + 5 + outputDebitAmount: 17, // 5 + 10 + 2 + }); + }); + + it("creates separate rows for different models of same user", async () => { + const events = [ + { + type: "AI_TOKEN_USAGE" as const, + userId: "user-1", + reported_timestamp: DateTime.now(), + data: { + model: "gpt-4", + inputTokens: 100, + outputTokens: 50, + inputDebitAmount: 10, + outputDebitAmount: 5, + }, + }, + { + type: "AI_TOKEN_USAGE" as const, + userId: "user-1", + reported_timestamp: DateTime.now(), + data: { + model: "claude-3-opus", + inputTokens: 150, + outputTokens: 75, + inputDebitAmount: 15, + outputDebitAmount: 7, + }, + }, + { + type: "AI_TOKEN_USAGE" as const, + userId: "user-1", + reported_timestamp: DateTime.now(), + data: { + model: "gpt-4", + inputTokens: 200, + outputTokens: 100, + inputDebitAmount: 20, + outputDebitAmount: 10, + }, + }, + ]; + + mockTransaction.returning.mockResolvedValueOnce([ + { id: "event-1" }, + { id: "event-2" }, + ]); + + await handleAddAiTokenUsage(events, "api-key-123"); + + // Should insert 2 events (one per model) + const eventInsertCall = mockTransaction.values.mock.calls[1]; + expect(eventInsertCall[0]).toHaveLength(2); + + // Should insert 2 AI token usage records + const aiTokenUsageInsertCall = mockTransaction.values.mock.calls[2]; + expect(aiTokenUsageInsertCall[0]).toHaveLength(2); + + // Find GPT-4 aggregated record + const gpt4Record = aiTokenUsageInsertCall[0].find( + (r: any) => r.model === "gpt-4", + ); + expect(gpt4Record).toEqual({ + id: expect.any(String), + model: "gpt-4", + inputTokens: 300, // 100 + 200 + outputTokens: 150, // 50 + 100 + inputDebitAmount: 30, // 10 + 20 + outputDebitAmount: 15, // 5 + 10 + }); + + // Find Claude aggregated record + const claudeRecord = aiTokenUsageInsertCall[0].find( + (r: any) => r.model === "claude-3-opus", + ); + expect(claudeRecord).toEqual({ + id: expect.any(String), + model: "claude-3-opus", + inputTokens: 150, + outputTokens: 75, + inputDebitAmount: 15, + outputDebitAmount: 7, + }); + }); + + it("creates separate rows for different users with same model", async () => { + const events = [ + { + type: "AI_TOKEN_USAGE" as const, + userId: "user-1", + reported_timestamp: DateTime.now(), + data: { + model: "gpt-4", + inputTokens: 100, + outputTokens: 50, + inputDebitAmount: 10, + outputDebitAmount: 5, + }, + }, + { + type: "AI_TOKEN_USAGE" as const, + userId: "user-2", + reported_timestamp: DateTime.now(), + data: { + model: "gpt-4", + inputTokens: 200, + outputTokens: 100, + inputDebitAmount: 20, + outputDebitAmount: 10, + }, + }, + { + type: "AI_TOKEN_USAGE" as const, + userId: "user-1", + reported_timestamp: DateTime.now(), + data: { + model: "gpt-4", + inputTokens: 50, + outputTokens: 25, + inputDebitAmount: 5, + outputDebitAmount: 2, + }, + }, + ]; + + mockTransaction.returning.mockResolvedValueOnce([ + { id: "event-1" }, + { id: "event-2" }, + ]); + + await handleAddAiTokenUsage(events, "api-key-123"); + + // Should insert 2 events (one per user) + const eventInsertCall = mockTransaction.values.mock.calls[1]; + expect(eventInsertCall[0]).toHaveLength(2); + + // Should insert 2 AI token usage records + const aiTokenUsageInsertCall = mockTransaction.values.mock.calls[2]; + expect(aiTokenUsageInsertCall[0]).toHaveLength(2); + }); + + it("handles complex scenario with multiple users and models", async () => { + const events = [ + { + type: "AI_TOKEN_USAGE" as const, + userId: "user-1", + reported_timestamp: DateTime.now(), + data: { + model: "gpt-4", + inputTokens: 100, + outputTokens: 50, + inputDebitAmount: 10, + outputDebitAmount: 5, + }, + }, + { + type: "AI_TOKEN_USAGE" as const, + userId: "user-1", + reported_timestamp: DateTime.now(), + data: { + model: "gpt-4", + inputTokens: 200, + outputTokens: 100, + inputDebitAmount: 20, + outputDebitAmount: 10, + }, + }, + { + type: "AI_TOKEN_USAGE" as const, + userId: "user-1", + reported_timestamp: DateTime.now(), + data: { + model: "claude-3-sonnet", + inputTokens: 150, + outputTokens: 75, + inputDebitAmount: 15, + outputDebitAmount: 7, + }, + }, + { + type: "AI_TOKEN_USAGE" as const, + userId: "user-2", + reported_timestamp: DateTime.now(), + data: { + model: "gpt-4", + inputTokens: 300, + outputTokens: 150, + inputDebitAmount: 30, + outputDebitAmount: 15, + }, + }, + { + type: "AI_TOKEN_USAGE" as const, + userId: "user-2", + reported_timestamp: DateTime.now(), + data: { + model: "gpt-4", + inputTokens: 100, + outputTokens: 50, + inputDebitAmount: 10, + outputDebitAmount: 5, + }, + }, + ]; + + mockTransaction.returning.mockResolvedValueOnce([ + { id: "event-1" }, + { id: "event-2" }, + { id: "event-3" }, + ]); + + await handleAddAiTokenUsage(events, "api-key-123"); + + // Should insert 3 aggregated events: + // 1. user-1 + gpt-4 + // 2. user-1 + claude-3-sonnet + // 3. user-2 + gpt-4 + const eventInsertCall = mockTransaction.values.mock.calls[1]; + expect(eventInsertCall[0]).toHaveLength(3); + + const aiTokenUsageInsertCall = mockTransaction.values.mock.calls[2]; + expect(aiTokenUsageInsertCall[0]).toHaveLength(3); + + // Verify aggregation: should have 2 gpt-4 records and 1 claude record + const gpt4Records = aiTokenUsageInsertCall[0].filter( + (r: any) => r.model === "gpt-4", + ); + expect(gpt4Records).toHaveLength(2); + + const claudeRecords = aiTokenUsageInsertCall[0].filter( + (r: any) => r.model === "claude-3-sonnet", + ); + expect(claudeRecords).toHaveLength(1); + + // Verify the gpt-4 records have correct aggregated values + const gpt4Tokens = gpt4Records.map((r: any) => r.inputTokens).sort(); + expect(gpt4Tokens).toEqual([300, 400]); // user-1: 100+200=300, user-2: 300+100=400 + }); + }); + + describe("single event handling", () => { + it("handles single event without aggregation", async () => { + const events = [ + { + type: "AI_TOKEN_USAGE" as const, + userId: "user-1", + reported_timestamp: DateTime.now(), + data: { + model: "gpt-4", + inputTokens: 100, + outputTokens: 50, + inputDebitAmount: 10, + outputDebitAmount: 5, + }, + }, + ]; + + mockTransaction.returning.mockResolvedValueOnce([{ id: "event-1" }]); + + await handleAddAiTokenUsage(events, "api-key-123"); + + const aiTokenUsageInsertCall = mockTransaction.values.mock.calls[2]; + expect(aiTokenUsageInsertCall[0]).toHaveLength(1); + expect(aiTokenUsageInsertCall[0][0]).toEqual({ + id: "event-1", + model: "gpt-4", + inputTokens: 100, + outputTokens: 50, + inputDebitAmount: 10, + outputDebitAmount: 5, + }); + }); + }); + + describe("user insertion", () => { + it("batch inserts all unique users", async () => { + const events = [ + { + type: "AI_TOKEN_USAGE" as const, + userId: "user-1", + reported_timestamp: DateTime.now(), + data: { + model: "gpt-4", + inputTokens: 100, + outputTokens: 50, + inputDebitAmount: 10, + outputDebitAmount: 5, + }, + }, + { + type: "AI_TOKEN_USAGE" as const, + userId: "user-2", + reported_timestamp: DateTime.now(), + data: { + model: "gpt-4", + inputTokens: 200, + outputTokens: 100, + inputDebitAmount: 20, + outputDebitAmount: 10, + }, + }, + { + type: "AI_TOKEN_USAGE" as const, + userId: "user-3", + reported_timestamp: DateTime.now(), + data: { + model: "claude-3-opus", + inputTokens: 150, + outputTokens: 75, + inputDebitAmount: 15, + outputDebitAmount: 7, + }, + }, + ]; + + mockTransaction.returning.mockResolvedValueOnce([ + { id: "event-1" }, + { id: "event-2" }, + { id: "event-3" }, + ]); + + await handleAddAiTokenUsage(events, "api-key-123"); + + // Check users insert + const usersInsertCall = mockTransaction.values.mock.calls[0]; + expect(usersInsertCall[0]).toHaveLength(3); + expect(usersInsertCall[0]).toEqual( + expect.arrayContaining([ + { id: "user-1" }, + { id: "user-2" }, + { id: "user-3" }, + ]), + ); + }); + + it("inserts duplicate users only once", async () => { + const events = [ + { + type: "AI_TOKEN_USAGE" as const, + userId: "user-1", + reported_timestamp: DateTime.now(), + data: { + model: "gpt-4", + inputTokens: 100, + outputTokens: 50, + inputDebitAmount: 10, + outputDebitAmount: 5, + }, + }, + { + type: "AI_TOKEN_USAGE" as const, + userId: "user-1", + reported_timestamp: DateTime.now(), + data: { + model: "claude-3-opus", + inputTokens: 200, + outputTokens: 100, + inputDebitAmount: 20, + outputDebitAmount: 10, + }, + }, + ]; + + mockTransaction.returning.mockResolvedValueOnce([ + { id: "event-1" }, + { id: "event-2" }, + ]); + + await handleAddAiTokenUsage(events, "api-key-123"); + + // Should only insert user-1 once + const usersInsertCall = mockTransaction.values.mock.calls[0]; + expect(usersInsertCall[0]).toHaveLength(1); + expect(usersInsertCall[0][0]).toEqual({ id: "user-1" }); + }); + }); + + describe("edge cases", () => { + it("handles empty array", async () => { + const result = await handleAddAiTokenUsage([], "api-key-123"); + expect(result).toBeUndefined(); + expect(mockTransaction.insert).not.toHaveBeenCalled(); + }); + + it("handles event insert returning no IDs", async () => { + const events = [ + { + type: "AI_TOKEN_USAGE" as const, + userId: "user-1", + reported_timestamp: DateTime.now(), + data: { + model: "gpt-4", + inputTokens: 100, + outputTokens: 50, + inputDebitAmount: 10, + outputDebitAmount: 5, + }, + }, + ]; + + mockTransaction.returning.mockResolvedValueOnce([]); + + await expect( + handleAddAiTokenUsage(events, "api-key-123"), + ).rejects.toThrow("Event insert returned no IDs"); + }); + + it("handles event ID count mismatch", async () => { + const events = [ + { + type: "AI_TOKEN_USAGE" as const, + userId: "user-1", + reported_timestamp: DateTime.now(), + data: { + model: "gpt-4", + inputTokens: 100, + outputTokens: 50, + inputDebitAmount: 10, + outputDebitAmount: 5, + }, + }, + { + type: "AI_TOKEN_USAGE" as const, + userId: "user-2", + reported_timestamp: DateTime.now(), + data: { + model: "gpt-4", + inputTokens: 200, + outputTokens: 100, + inputDebitAmount: 20, + outputDebitAmount: 10, + }, + }, + ]; + + // Return only 1 ID when expecting 2 + mockTransaction.returning.mockResolvedValueOnce([{ id: "event-1" }]); + + await expect( + handleAddAiTokenUsage(events, "api-key-123"), + ).rejects.toThrow("Expected 2 event IDs but got 1"); + }); + }); + + describe("database errors", () => { + it("handles transaction failure", async () => { + const events = [ + { + type: "AI_TOKEN_USAGE" as const, + userId: "user-1", + reported_timestamp: DateTime.now(), + data: { + model: "gpt-4", + inputTokens: 100, + outputTokens: 50, + inputDebitAmount: 10, + outputDebitAmount: 5, + }, + }, + ]; + + mockDb.transaction.mockRejectedValueOnce(new Error("Transaction failed")); + + await expect( + handleAddAiTokenUsage(events, "api-key-123"), + ).rejects.toThrow("Transaction failed"); + }); + + it("handles event insert failure", async () => { + const events = [ + { + type: "AI_TOKEN_USAGE" as const, + userId: "user-1", + reported_timestamp: DateTime.now(), + data: { + model: "gpt-4", + inputTokens: 100, + outputTokens: 50, + inputDebitAmount: 10, + outputDebitAmount: 5, + }, + }, + ]; + + mockTransaction.returning.mockRejectedValueOnce( + new Error("Insert failed"), + ); + + await expect( + handleAddAiTokenUsage(events, "api-key-123"), + ).rejects.toThrow(); + }); + }); +}); From 2a80df894835ebd2ad1e8b775434e81320a3a898 Mon Sep 17 00:00:00 2001 From: Jayadeep Bejoy Date: Tue, 16 Dec 2025 23:49:23 +0400 Subject: [PATCH 4/4] test(hmac bs) Signed-off-by: Jayadeep Bejoy --- src/__tests__/setup.ts | 8 ++ .../unit/http/createdCheckout.test.ts | 79 +++++++++++++------ src/__tests__/unit/interceptors/auth.test.ts | 43 +++++----- src/utils/hashAPIKey.ts | 16 ++-- 4 files changed, 95 insertions(+), 51 deletions(-) diff --git a/src/__tests__/setup.ts b/src/__tests__/setup.ts index ff4741c..d5c7e2c 100644 --- a/src/__tests__/setup.ts +++ b/src/__tests__/setup.ts @@ -1,3 +1,11 @@ // Vitest setup file - runs before all tests // Set required environment variables before any modules are imported process.env.HMAC_SECRET = "test-secret-key-for-testing"; +process.env.LEMON_SQUEEZY_API_KEY = "test-api-key"; +process.env.LEMON_SQUEEZY_WEBHOOK_SECRET = "test-webhook-secret"; + +// Mock vi.mock for hoisted mocks +import { vi } from "vitest"; + +// Ensure vi is available globally +(globalThis as any).vi = vi; diff --git a/src/__tests__/unit/http/createdCheckout.test.ts b/src/__tests__/unit/http/createdCheckout.test.ts index 8d7dba9..372881f 100644 --- a/src/__tests__/unit/http/createdCheckout.test.ts +++ b/src/__tests__/unit/http/createdCheckout.test.ts @@ -3,7 +3,7 @@ import { EventEmitter } from "node:events"; import type { IncomingMessage, ServerResponse } from "node:http"; import { createHmac } from "node:crypto"; -// Shared mocks +// Shared mocks - initialize functions after vi is available const loggerMock = { logOperationInfo: vi.fn(), logOperationError: vi.fn(), @@ -12,6 +12,7 @@ const loggerMock = { }; const getStorageAdapterMock = vi.fn(); +const lemonSqueezySetupMock = vi.fn(); // Track Payment constructor calls const paymentConstructorCalls: Array<{ userId: string; data: unknown }> = []; @@ -28,24 +29,38 @@ class PaymentMock { } } -const lemonSqueezySetupMock = vi.fn(); - +// Mock modules vi.mock("../../../errors/logger.ts", () => ({ - logger: loggerMock, + logger: { + logOperationInfo: vi.fn(), + logOperationError: vi.fn(), + logWarning: vi.fn(), + logDebug: vi.fn(), + }, })); vi.mock("../../../factory/StorageAdapterFactory.ts", () => ({ StorageAdapterFactory: { - getStorageAdapter: getStorageAdapterMock, + getStorageAdapter: vi.fn(), }, })); vi.mock("../../../events/RawEvents/Payment.ts", () => ({ - Payment: PaymentMock, + Payment: class Payment { + public userId: string; + public data: unknown; + public readonly type = "PAYMENT" as const; + + constructor(userId: string, data: unknown) { + this.userId = userId; + this.data = data; + paymentConstructorCalls.push({ userId, data }); + } + }, })); vi.mock("@lemonsqueezy/lemonsqueezy.js", () => ({ - lemonSqueezySetup: lemonSqueezySetupMock, + lemonSqueezySetup: vi.fn(), })); class MockRequest extends EventEmitter { @@ -82,11 +97,20 @@ function emitBody(req: MockRequest, body: string): void { } describe("handleLemonSqueezyWebhook", () => { - beforeEach(() => { + let loggerModule: any; + let storageModule: any; + let lsModule: any; + + beforeEach(async () => { vi.resetModules(); vi.clearAllMocks(); paymentConstructorCalls.length = 0; + // Import mocked modules + loggerModule = await import("../../../errors/logger.ts"); + storageModule = await import("../../../factory/StorageAdapterFactory.ts"); + lsModule = await import("@lemonsqueezy/lemonsqueezy.js"); + // Default env; individual tests can override process.env.LEMON_SQUEEZY_API_KEY = "test-api-key"; process.env.LEMON_SQUEEZY_WEBHOOK_SECRET = "test-webhook-secret"; @@ -112,7 +136,7 @@ describe("handleLemonSqueezyWebhook", () => { expect((res as any).statusCode).toBe(500); expect((res as any).body).toContain("Webhook secret not configured"); - expect(loggerMock.logOperationError).toHaveBeenCalledWith( + expect(loggerModule.logger.logOperationError).toHaveBeenCalledWith( "LemonSqueezyWebhook", "config", "MISSING_WEBHOOK_SECRET", @@ -140,7 +164,7 @@ describe("handleLemonSqueezyWebhook", () => { expect((res as any).statusCode).toBe(401); expect((res as any).body).toContain("Invalid signature"); - expect(loggerMock.logOperationError).toHaveBeenCalledWith( + expect(loggerModule.logger.logOperationError).toHaveBeenCalledWith( "LemonSqueezyWebhook", "validate_signature", "INVALID_SIGNATURE", @@ -171,7 +195,7 @@ describe("handleLemonSqueezyWebhook", () => { expect((res as any).statusCode).toBe(400); expect((res as any).body).toContain("Invalid JSON payload"); - expect(loggerMock.logOperationError).toHaveBeenCalledWith( + expect(loggerModule.logger.logOperationError).toHaveBeenCalledWith( "LemonSqueezyWebhook", "parse_payload", "INVALID_JSON", @@ -206,7 +230,9 @@ describe("handleLemonSqueezyWebhook", () => { expect((res as any).statusCode).toBe(200); expect((res as any).body).toContain("Event ignored"); - expect(getStorageAdapterMock).not.toHaveBeenCalled(); + expect( + storageModule.StorageAdapterFactory.getStorageAdapter, + ).not.toHaveBeenCalled(); expect(paymentConstructorCalls.length).toBe(0); }); @@ -239,7 +265,7 @@ describe("handleLemonSqueezyWebhook", () => { expect((res as any).statusCode).toBe(400); expect((res as any).body).toContain("Missing user_id in webhook payload"); - expect(loggerMock.logOperationError).toHaveBeenCalledWith( + expect(loggerModule.logger.logOperationError).toHaveBeenCalledWith( "LemonSqueezyWebhook", "validate_payload", "MISSING_USER_ID", @@ -280,7 +306,7 @@ describe("handleLemonSqueezyWebhook", () => { expect((res as any).statusCode).toBe(400); expect((res as any).body).toContain("Missing apiKeyId in webhook payload"); - expect(loggerMock.logOperationError).toHaveBeenCalledWith( + expect(loggerModule.logger.logOperationError).toHaveBeenCalledWith( "LemonSqueezyWebhook", "validate_payload", "MISSING_API_KEY_ID", @@ -295,9 +321,11 @@ describe("handleLemonSqueezyWebhook", () => { process.env.LEMON_SQUEEZY_WEBHOOK_SECRET = secret; const adapterAddMock = vi.fn().mockResolvedValue(undefined); - getStorageAdapterMock.mockResolvedValue({ + vi.mocked( + storageModule.StorageAdapterFactory.getStorageAdapter, + ).mockResolvedValue({ add: adapterAddMock, - }); + } as any); const handleWebhook = await importHandler(); @@ -341,9 +369,12 @@ describe("handleLemonSqueezyWebhook", () => { data: { creditAmount: 123 }, }); - expect(getStorageAdapterMock).toHaveBeenCalledTimes(1); - const adapterCall = getStorageAdapterMock.mock.calls[0]; - //@ts-ignore + expect( + storageModule.StorageAdapterFactory.getStorageAdapter, + ).toHaveBeenCalledTimes(1); + const adapterCall = vi.mocked( + storageModule.StorageAdapterFactory.getStorageAdapter, + ).mock.calls[0]; expect(adapterCall[1]).toBe("api-key-456"); expect(adapterAddMock).toHaveBeenCalledTimes(1); @@ -358,9 +389,11 @@ describe("handleLemonSqueezyWebhook", () => { const dbError = new Error("DB error"); const adapterAddMock = vi.fn().mockRejectedValue(dbError); - getStorageAdapterMock.mockResolvedValue({ + vi.mocked( + storageModule.StorageAdapterFactory.getStorageAdapter, + ).mockResolvedValue({ add: adapterAddMock, - }); + } as any); const handleWebhook = await importHandler(); @@ -401,7 +434,7 @@ describe("handleLemonSqueezyWebhook", () => { expect((res as any).statusCode).toBe(500); expect((res as any).body).toContain("Database error"); - expect(loggerMock.logOperationError).toHaveBeenCalledWith( + expect(loggerModule.logger.logOperationError).toHaveBeenCalledWith( "LemonSqueezyWebhook", "database", "DATABASE_ERROR", @@ -429,7 +462,7 @@ describe("handleLemonSqueezyWebhook", () => { expect((res as any).statusCode).toBe(500); expect((res as any).body).toContain("Internal server error"); - expect(loggerMock.logOperationError).toHaveBeenCalledWith( + expect(loggerModule.logger.logOperationError).toHaveBeenCalledWith( "LemonSqueezyWebhook", "failed", "UNEXPECTED_ERROR", diff --git a/src/__tests__/unit/interceptors/auth.test.ts b/src/__tests__/unit/interceptors/auth.test.ts index 1120b31..dd7b49f 100644 --- a/src/__tests__/unit/interceptors/auth.test.ts +++ b/src/__tests__/unit/interceptors/auth.test.ts @@ -1,23 +1,9 @@ -import { describe, it, expect, vi, beforeEach, beforeAll } from "vitest"; +import { describe, it, expect, vi, beforeEach } from "vitest"; import { authInterceptor, no_auth } from "../../../interceptors/auth"; import * as dbModule from "../../../storage/db/postgres/db"; import * as hashModule from "../../../utils/hashAPIKey"; describe("authInterceptor", () => { - // Authorization that starts with Bearer and has valid format should succeed with valid DB response - const mockDb = { - select: vi.fn().mockReturnThis(), - from: vi.fn().mockReturnThis(), - where: vi.fn().mockReturnThis(), - limit: vi.fn().mockResolvedValue([ - { - id: "test-api-key-id", - expiresAt: new Date(Date.now() + 86400000).toISOString(), // expires tomorrow - revoked: false, - }, - ]), - }; - const makeReq = (auth?: string) => ({ url: "https://api.example.com/protected_endpoint", header: auth @@ -26,11 +12,28 @@ describe("authInterceptor", () => { contextValues: new Map(), }); - // Mock DB to return valid API key record - vi.spyOn(dbModule, "getPostgresDB").mockReturnValue({ - ...mockDb, - } as any); - vi.spyOn(hashModule, "hashAPIKey").mockReturnValue("mocked-hash"); + beforeEach(() => { + vi.clearAllMocks(); + + // Mock DB to return valid API key record + const mockDb = { + select: vi.fn().mockReturnThis(), + from: vi.fn().mockReturnThis(), + where: vi.fn().mockReturnThis(), + limit: vi.fn().mockResolvedValue([ + { + id: "test-api-key-id", + expiresAt: new Date(Date.now() + 86400000).toISOString(), // expires tomorrow + revoked: false, + }, + ]), + }; + + vi.spyOn(dbModule, "getPostgresDB").mockReturnValue({ + ...mockDb, + } as any); + vi.spyOn(hashModule, "hashAPIKey").mockReturnValue("mocked-hash"); + }); it("Ignores no_auth endpoints", async () => { const next = vi.fn().mockResolvedValue("next called"); diff --git a/src/utils/hashAPIKey.ts b/src/utils/hashAPIKey.ts index b5f476c..6be02ba 100644 --- a/src/utils/hashAPIKey.ts +++ b/src/utils/hashAPIKey.ts @@ -1,16 +1,16 @@ import { createHmac } from "crypto"; -// Retrieve and validate HMAC_SECRET at module load time -const HMAC_SECRET = process.env.HMAC_SECRET; - -if (!HMAC_SECRET) { - throw new Error("HMAC_SECRET environment variable is not set"); +// Lazily retrieve HMAC_SECRET to allow test setup to run first +function getSecret(): string { + const secret = process.env.HMAC_SECRET; + if (!secret) { + throw new Error("HMAC_SECRET environment variable is not set"); + } + return secret; } -const SECRET: string = HMAC_SECRET; - export function hashAPIKey(apiKey: string): string { - return createHmac("sha256", SECRET).update(apiKey).digest("hex"); + return createHmac("sha256", getSecret()).update(apiKey).digest("hex"); } /**