From 1d1ddc2553b08a4b4ddac7cd56c3d6ca74056126 Mon Sep 17 00:00:00 2001 From: Devyash Saini Date: Thu, 22 Jan 2026 00:23:55 +0530 Subject: [PATCH 1/4] feat: publish on release ci Signed-off-by: Devyash Saini --- .github/workflows/publish-sdk.yml | 124 ++++++++++++++++++++++++++++++ 1 file changed, 124 insertions(+) create mode 100644 .github/workflows/publish-sdk.yml diff --git a/.github/workflows/publish-sdk.yml b/.github/workflows/publish-sdk.yml new file mode 100644 index 0000000..fafeca3 --- /dev/null +++ b/.github/workflows/publish-sdk.yml @@ -0,0 +1,124 @@ +name: Publish SDK + +on: + release: + types: [published] + +jobs: + publish: + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Setup Bun + uses: oven-sh/setup-bun@v1 + with: + bun-version: "1.3.2" + + - name: Install SDK dependencies + working-directory: packages/scrawn + run: bun install + + - name: Resolve and validate release version + working-directory: packages/scrawn + run: | + TAG="${GITHUB_REF_NAME}" + if [[ "${TAG}" == v* ]]; then + VERSION="${TAG#v}" + else + VERSION="${TAG}" + fi + + CURRENT_VERSION=$(node -p "require('./package.json').version") + CURRENT_VERSION="${CURRENT_VERSION}" TARGET_VERSION="${VERSION}" node - <<'NODE' + const current = process.env.CURRENT_VERSION; + const target = process.env.TARGET_VERSION; + const semverPattern = /^\d+\.\d+\.\d+(-[0-9A-Za-z.-]+)?$/; + + if (!semverPattern.test(current) || !semverPattern.test(target)) { + console.error(`Invalid version format. Current: ${current}, Target: ${target}`); + process.exit(1); + } + + const parse = (version) => { + const [core, prerelease] = version.split('-', 2); + const parts = core.split('.').map((part) => Number(part)); + return { parts, prerelease }; + }; + + const compareIdentifiers = (a, b) => { + const aNum = Number(a); + const bNum = Number(b); + const aIsNum = !Number.isNaN(aNum) && String(aNum) === a; + const bIsNum = !Number.isNaN(bNum) && String(bNum) === b; + if (aIsNum && bIsNum) return aNum - bNum; + if (aIsNum) return -1; + if (bIsNum) return 1; + return a.localeCompare(b); + }; + + const comparePrerelease = (a, b) => { + if (!a && !b) return 0; + if (!a && b) return 1; + if (a && !b) return -1; + const aParts = a.split('.'); + const bParts = b.split('.'); + const len = Math.max(aParts.length, bParts.length); + for (let i = 0; i < len; i += 1) { + const aPart = aParts[i]; + const bPart = bParts[i]; + if (aPart === undefined) return -1; + if (bPart === undefined) return 1; + const diff = compareIdentifiers(aPart, bPart); + if (diff !== 0) return diff; + } + return 0; + }; + + const compareVersions = (a, b) => { + const aParsed = parse(a); + const bParsed = parse(b); + for (let i = 0; i < 3; i += 1) { + const diff = (aParsed.parts[i] || 0) - (bParsed.parts[i] || 0); + if (diff !== 0) return diff; + } + return comparePrerelease(aParsed.prerelease, bParsed.prerelease); + }; + + const diff = compareVersions(target, current); + if (diff <= 0) { + console.error(`Release version ${target} must be higher than package.json version ${current}.`); + process.exit(1); + } + NODE + + echo "VERSION=${VERSION}" >> "$GITHUB_ENV" + + - name: Update package.json version + working-directory: packages/scrawn + run: npm version "${VERSION}" --no-git-tag-version + env: + VERSION: ${{ env.VERSION }} + + - name: Run SDK tests + working-directory: packages/scrawn + run: bun run test + + - name: Clean and build SDK + run: bash ./clean_build.sh + + - name: Publish to npm + working-directory: packages/scrawn + env: + NPM_TOKEN: ${{ secrets.NPM_TOKEN }} + VERSION: ${{ env.VERSION }} + run: | + if [[ "${VERSION}" == *"-"* ]]; then + PRERELEASE="${VERSION#*-}" + PUBLISH_TAG="${PRERELEASE%%.*}" + else + PUBLISH_TAG="latest" + fi + echo "//registry.npmjs.org/:_authToken=${NPM_TOKEN}" > ~/.npmrc + bun publish --tag "${PUBLISH_TAG}" --access public --no-git-checks From 4c3f338c6af70b154e2e2cb21d003de5037f2618 Mon Sep 17 00:00:00 2001 From: Devyash Saini Date: Thu, 22 Jan 2026 00:31:06 +0530 Subject: [PATCH 2/4] fix(ci): tests work regardless Signed-off-by: Devyash Saini --- .github/workflows/tests.yml | 3 --- 1 file changed, 3 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 3a23b1b..aa03adb 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -2,9 +2,6 @@ name: SDK Tests on: pull_request: - paths: - - "packages/scrawn/**" - - ".github/workflows/tests.yml" jobs: sdk-tests: From 27e9a259ca468af51b6f3efd1ec693fd359c3825 Mon Sep 17 00:00:00 2001 From: Devyash Saini Date: Fri, 23 Jan 2026 11:58:03 +0530 Subject: [PATCH 3/4] feat(core): ai token stream implemented Signed-off-by: Devyash Saini --- examples/ai-token-stream-usage.ts | 73 +++++ packages/scrawn/proto/buf.gen.yaml | 16 +- packages/scrawn/proto/event/v1/event.proto | 114 +++++--- packages/scrawn/src/config.ts | 2 +- packages/scrawn/src/core/grpc/client.ts | 69 +++-- packages/scrawn/src/core/grpc/index.ts | 37 +-- .../src/core/grpc/streamRequestBuilder.ts | 133 +++++++++ packages/scrawn/src/core/scrawn.ts | 274 ++++++++++++++++-- packages/scrawn/src/core/types/event.ts | 90 +++++- .../scrawn/src/gen/auth/v1/auth_connect.ts | 2 +- packages/scrawn/src/gen/auth/v1/auth_pb.ts | 2 +- .../scrawn/src/gen/event/v1/event_connect.ts | 17 +- packages/scrawn/src/gen/event/v1/event_pb.ts | 200 ++++++++++++- .../src/gen/payment/v1/payment_connect.ts | 2 +- .../scrawn/src/gen/payment/v1/payment_pb.ts | 2 +- .../scrawn/src/utils/forkAsyncIterable.ts | 83 ++++++ .../unit/types/aiTokenUsagePayload.test.ts | 212 ++++++++++++++ .../unit/utils/forkAsyncIterable.test.ts | 208 +++++++++++++ 18 files changed, 1411 insertions(+), 125 deletions(-) create mode 100644 examples/ai-token-stream-usage.ts create mode 100644 packages/scrawn/src/core/grpc/streamRequestBuilder.ts create mode 100644 packages/scrawn/src/utils/forkAsyncIterable.ts create mode 100644 packages/scrawn/tests/unit/types/aiTokenUsagePayload.test.ts create mode 100644 packages/scrawn/tests/unit/utils/forkAsyncIterable.test.ts diff --git a/examples/ai-token-stream-usage.ts b/examples/ai-token-stream-usage.ts new file mode 100644 index 0000000..ea8c2f5 --- /dev/null +++ b/examples/ai-token-stream-usage.ts @@ -0,0 +1,73 @@ +import { Scrawn, type AITokenUsagePayload } from '@scrawn/core'; +import { config } from 'dotenv'; +config({ path: '.env.local' }); + +const scrawn = new Scrawn({ + apiKey: (process.env.SCRAWN_KEY || '') as `scrn_${string}`, + baseURL: process.env.SCRAWN_BASE_URL || 'http://localhost:8069', +}); + +// Simulate what your AI provider wrapper would do: +// As tokens stream from OpenAI/Anthropic/etc, you yield usage events +async function* tokenUsageFromAIStream(): AsyncGenerator { + const userId = 'c0971bcb-b901-4c3e-a191-c9a97871c39f'; + + // Initial prompt tokens + yield { + userId, + model: 'gpt-4', + inputTokens: 150, + outputTokens: 0, + inputDebit: { amount: 0.0045 }, + outputDebit: { amount: 0 }, + }; + + // Output tokens as they stream + yield { + userId, + model: 'gpt-4', + inputTokens: 0, + outputTokens: 75, + inputDebit: { amount: 0 }, + outputDebit: { amount: 0.0045 }, + }; +} + +// Example 1: Fire-and-forget mode (default) +// The stream is consumed and sent to backend, you just await the final response +async function fireAndForgetExample() { + console.log('--- Fire-and-forget mode ---'); + + const response = await scrawn.aiTokenStreamConsumer(tokenUsageFromAIStream()); + + console.log(`Streamed ${response.eventsProcessed} token usage events`); +} + +// Example 2: Return mode +// The stream is forked - one fork goes to billing (non-blocking), +// the other is returned to you for streaming to the user +async function returnModeExample() { + console.log('\n--- Return mode (with stream passthrough) ---'); + + const { response, stream } = await scrawn.aiTokenStreamConsumer( + tokenUsageFromAIStream(), + { return: true } + ); + + // Stream tokens to user while billing happens in background + console.log('Streaming tokens to user:'); + for await (const token of stream) { + console.log(` -> ${token.model}: input=${token.inputTokens}, output=${token.outputTokens}`); + } + + // Billing completes after stream is consumed + const result = await response; + console.log(`Billing complete: ${result.eventsProcessed} events processed`); +} + +async function main() { + await fireAndForgetExample(); + await returnModeExample(); +} + +main().catch(console.error); diff --git a/packages/scrawn/proto/buf.gen.yaml b/packages/scrawn/proto/buf.gen.yaml index 039e9a9..ba85b19 100644 --- a/packages/scrawn/proto/buf.gen.yaml +++ b/packages/scrawn/proto/buf.gen.yaml @@ -1,8 +1,8 @@ -version: v1 -plugins: - - plugin: es - out: ../src/gen - opt: target=ts - - plugin: connect-es - out: ../src/gen - opt: target=ts +version: v1 +plugins: + - plugin: buf.build/bufbuild/es:v1.10.0 + out: ../src/gen + opt: target=ts + - plugin: buf.build/connectrpc/es:v1.6.1 + out: ../src/gen + opt: target=ts diff --git a/packages/scrawn/proto/event/v1/event.proto b/packages/scrawn/proto/event/v1/event.proto index d63ef64..f2a7296 100644 --- a/packages/scrawn/proto/event/v1/event.proto +++ b/packages/scrawn/proto/event/v1/event.proto @@ -1,40 +1,74 @@ -syntax = "proto3"; - -package event.v1; - -service EventService { - // RegisterEvent registers an event as being done by a user - rpc RegisterEvent(RegisterEventRequest) returns (RegisterEventResponse) {} -} - -enum EventType { - EVENT_TYPE_UNSPECIFIED = 0; - SDK_CALL = 1; -} - -enum SDKCallType { - SDKCallType_UNSPECIFIED = 0; - RAW = 1; - MIDDLEWARE_CALL = 2; -} - -message RegisterEventRequest { - EventType type = 1; - string userId = 2; - oneof data { - SDKCall sdkCall = 3; - } -} - -message SDKCall { - SDKCallType sdkCallType = 1; - - oneof debit { - float amount = 2; - string tag = 3; - } -} - -message RegisterEventResponse { - string random = 1; -} +syntax = "proto3"; + +package event.v1; + +service EventService { + // RegisterEvent registers an event as being done by a user + rpc RegisterEvent(RegisterEventRequest) returns (RegisterEventResponse) {} + + // StreamEvents streams events from client to server (e.g., AI token usage) + rpc StreamEvents(stream StreamEventRequest) returns (StreamEventResponse) {} +} + +enum EventType { + EVENT_TYPE_UNSPECIFIED = 0; + SDK_CALL = 1; + AI_TOKEN_USAGE = 2; +} + +enum SDKCallType { + SDKCallType_UNSPECIFIED = 0; + RAW = 1; + MIDDLEWARE_CALL = 2; +} + +message RegisterEventRequest { + EventType type = 1; + string userId = 2; + oneof data { + SDKCall sdkCall = 3; + } +} + +message SDKCall { + SDKCallType sdkCallType = 1; + + oneof debit { + float amount = 2; + string tag = 3; + } +} + +message RegisterEventResponse { + string random = 1; +} + +message StreamEventRequest { + EventType type = 1; + string userId = 2; + oneof data { + SDKCall sdkCall = 3; + AITokenUsage aiTokenUsage = 4; + } +} + +message AITokenUsage { + string model = 1; + int32 inputTokens = 2; + int32 outputTokens = 3; + + oneof inputDebit { + float inputAmount = 4; + string inputTag = 5; + } + + oneof outputDebit { + float outputAmount = 6; + string outputTag = 7; + } +} + +message StreamEventResponse { + int32 eventsProcessed = 1; + string message = 2; +} diff --git a/packages/scrawn/src/config.ts b/packages/scrawn/src/config.ts index 318d9a6..46b8f58 100644 --- a/packages/scrawn/src/config.ts +++ b/packages/scrawn/src/config.ts @@ -16,7 +16,7 @@ export const ScrawnConfig = { * - Binary framing * - Built-in connection keep-alive */ - httpVersion: '2' as const, + httpVersion: '1.1' as const, /** * Enable gzip compression for request/response bodies. diff --git a/packages/scrawn/src/core/grpc/client.ts b/packages/scrawn/src/core/grpc/client.ts index 2b1db16..ea72df9 100644 --- a/packages/scrawn/src/core/grpc/client.ts +++ b/packages/scrawn/src/core/grpc/client.ts @@ -21,13 +21,14 @@ * ``` */ -import type { Transport } from '@connectrpc/connect'; -import type { ServiceType } from '@bufbuild/protobuf'; -import { createConnectTransport } from '@connectrpc/connect-node'; -import { RequestBuilder } from './requestBuilder.js'; -import { ScrawnLogger } from '../../utils/logger.js'; -import type { ServiceMethodNames } from './types.js'; -import { ScrawnConfig } from '../../config.js'; +import type { Transport } from '@connectrpc/connect'; +import type { ServiceType } from '@bufbuild/protobuf'; +import { createConnectTransport } from '@connectrpc/connect-node'; +import { RequestBuilder } from './requestBuilder.js'; +import { StreamRequestBuilder } from './streamRequestBuilder.js'; +import { ScrawnLogger } from '../../utils/logger.js'; +import type { ServiceMethodNames } from './types.js'; +import { ScrawnConfig } from '../../config.js'; const log = new ScrawnLogger('GrpcClient'); @@ -147,13 +148,47 @@ export class GrpcClient { return this.baseURL; } - /** - * Get the underlying transport (for advanced use cases). - * - * @returns The Connect transport instance - * @internal - */ - getTransport(): Transport { - return this.transport; - } -} +/** + * Get the underlying transport (for advanced use cases). + * + * @returns The Connect transport instance + * @internal + */ + getTransport(): Transport { + return this.transport; + } + + /** + * Create a new stream request builder for a client-streaming service method. + * + * This is the entry point for making client-streaming gRPC calls. The method is fully type-safe: + * - Service parameter must be a valid gRPC service + * - Method name must exist on the service (autocomplete provided) + * - Payload type is inferred from the method + * - Response type is inferred from the method + * + * @template S - The gRPC service type (auto-inferred) + * @template M - The method name (auto-inferred and validated) + * + * @param service - The gRPC service definition (e.g., EventService) + * @param method - The method name as a string literal (e.g., 'streamEvents') + * @returns A new StreamRequestBuilder for chaining headers and streaming payloads + * + * @example + * ```typescript + * // EventService.streamEvents (client-streaming) + * const response = await client + * .newStreamCall(EventService, 'streamEvents') + * .addHeader('Authorization', `Bearer ${apiKey}`) + * .stream(asyncIterableOfEvents); + * // Response type is StreamEventResponse + * ``` + */ + newStreamCall>( + service: S, + method: M + ): StreamRequestBuilder { + log.debug(`Creating new stream request builder for ${service.typeName}.${String(method)}`); + return new StreamRequestBuilder(this.transport, service, method); + } +} diff --git a/packages/scrawn/src/core/grpc/index.ts b/packages/scrawn/src/core/grpc/index.ts index d7188cc..d82f377 100644 --- a/packages/scrawn/src/core/grpc/index.ts +++ b/packages/scrawn/src/core/grpc/index.ts @@ -1,18 +1,19 @@ -/** - * gRPC abstraction layer - Type-safe fluent API for gRPC calls. - * - * This module provides a beautiful, type-safe interface for making gRPC calls - * with automatic type inference, compile-time validation, and a fluent API. - * - * @module grpc - */ - -export { GrpcClient } from './client.js'; -export { RequestBuilder } from './requestBuilder.js'; -export type { - ServiceMethodNames, - MethodInput, - MethodOutput, - Headers, - RequestState, -} from './types.js'; +/** + * gRPC abstraction layer - Type-safe fluent API for gRPC calls. + * + * This module provides a beautiful, type-safe interface for making gRPC calls + * with automatic type inference, compile-time validation, and a fluent API. + * + * @module grpc + */ + +export { GrpcClient } from './client.js'; +export { RequestBuilder } from './requestBuilder.js'; +export { StreamRequestBuilder } from './streamRequestBuilder.js'; +export type { + ServiceMethodNames, + MethodInput, + MethodOutput, + Headers, + RequestState, +} from './types.js'; diff --git a/packages/scrawn/src/core/grpc/streamRequestBuilder.ts b/packages/scrawn/src/core/grpc/streamRequestBuilder.ts new file mode 100644 index 0000000..df3cebc --- /dev/null +++ b/packages/scrawn/src/core/grpc/streamRequestBuilder.ts @@ -0,0 +1,133 @@ +/** + * Type-safe fluent API for building and executing client-streaming gRPC requests. + * + * This module provides a chain-able interface for client-streaming RPC calls that ensures: + * - Compile-time type safety for all operations + * - Autocomplete for service methods and payload fields + * - Runtime validation of request state + * - Non-blocking stream consumption with internal buffering + * + * @example + * ```typescript + * const response = await client + * .newStreamCall(EventService, 'streamEvents') + * .addHeader('Authorization', `Bearer ${apiKey}`) + * .stream(asyncIterableOfPayloads); + * ``` + */ + +import type { Client, Transport } from '@connectrpc/connect'; +import type { ServiceType } from '@bufbuild/protobuf'; +import { createClient } from '@connectrpc/connect'; +import { ScrawnLogger } from '../../utils/logger.js'; +import type { + ServiceMethodNames, + MethodInput, + MethodOutput, + Headers, +} from './types.js'; + +const log = new ScrawnLogger('StreamRequestBuilder'); + +/** + * Builder for constructing type-safe client-streaming gRPC requests. + * + * This class implements a fluent interface where each method returns `this`, + * allowing for method chaining. Type parameters ensure that the payload type + * matches the selected service method at compile time. + * + * @template S - The gRPC service type + * @template M - The method name on the service (string literal) + */ +export class StreamRequestBuilder< + S extends ServiceType, + M extends ServiceMethodNames +> { + private readonly client: Client; + private readonly methodName: M; + private readonly headers: Headers = {}; + + /** + * @internal + * Constructs a new StreamRequestBuilder. Use GrpcClient.newStreamCall() instead. + */ + constructor( + transport: Transport, + service: S, + methodName: M + ) { + // Create a typed client for this service + this.client = createClient(service, transport); + this.methodName = methodName; + } + + /** + * Add a header to the request. + * + * Headers are sent with the gRPC call and can be used for authentication, + * tracing, or custom metadata. This method can be called multiple times + * to add multiple headers. + * + * @param key - Header name (e.g., 'authorization', 'x-request-id') + * @param value - Header value + * @returns The builder instance for chaining + * + * @example + * ```typescript + * builder + * .addHeader('authorization', 'Bearer token123') + * .addHeader('x-request-id', 'abc-def-123') + * ``` + */ + addHeader(key: string, value: string): this { + this.headers[key] = value; + return this; + } + + /** + * Execute the client-streaming gRPC request with the provided async iterable. + * + * Consumes the async iterable and streams each item to the server. + * The iterable is consumed in the background to avoid blocking the caller. + * Returns a promise that resolves when the stream is complete and the server responds. + * + * @param iterable - An async iterable of request payloads to stream + * @returns A promise that resolves to the typed response + * @throws Error if the gRPC call fails + * + * @example + * ```typescript + * async function* generatePayloads() { + * yield { type: EventType.AI_TOKEN_USAGE, userId: 'u123', data: { ... } }; + * yield { type: EventType.AI_TOKEN_USAGE, userId: 'u123', data: { ... } }; + * } + * + * const response = await builder.stream(generatePayloads()); + * console.log(`Processed ${response.eventsProcessed} events`); + * ``` + */ + async stream( + iterable: AsyncIterable extends infer T ? Partial : never> + ): Promise> { + try { + log.info(`Starting client-streaming gRPC call to ${String(this.methodName)}`); + log.debug(`Headers: ${JSON.stringify(this.headers)}`); + + // The actual client-streaming gRPC call + const response = await (this.client[this.methodName] as any)( + iterable, + { headers: this.headers } + ); + + log.info(`Successfully completed streaming gRPC call to ${String(this.methodName)}`); + return response as MethodOutput; + } catch (error) { + log.error( + `Streaming gRPC call to ${String(this.methodName)} failed: ${ + error instanceof Error ? error.message : 'Unknown error' + }` + ); + throw error; + } + } +} diff --git a/packages/scrawn/src/core/scrawn.ts b/packages/scrawn/src/core/scrawn.ts index d554ac2..e5b4999 100644 --- a/packages/scrawn/src/core/scrawn.ts +++ b/packages/scrawn/src/core/scrawn.ts @@ -1,25 +1,28 @@ -import type { AuthBase } from './auth/baseAuth.js'; -import type { - EventPayload, - MiddlewareRequest, - MiddlewareResponse, - MiddlewareNext, - MiddlewareEventConfig -} from './types/event.js'; -import type { AuthRegistry, AuthMethodName, AllCredentials } from './types/auth.js'; -import { ApiKeyAuth } from './auth/apiKeyAuth.js'; -import { ScrawnLogger } from '../utils/logger.js'; -import { matchPath } from '../utils/pathMatcher.js'; -import { EventPayloadSchema } from './types/event.js'; -import { GrpcClient } from './grpc/index.js'; -import { EventService } from '../gen/event/v1/event_connect.js'; -import { EventType, SDKCallType, SDKCall } from '../gen/event/v1/event_pb.js'; -import { PaymentService } from '../gen/payment/v1/payment_connect.js'; -import { - ScrawnConfigError, - ScrawnValidationError, - convertGrpcError -} from './errors/index.js'; +import type { AuthBase } from './auth/baseAuth.js'; +import type { + EventPayload, + MiddlewareRequest, + MiddlewareResponse, + MiddlewareNext, + MiddlewareEventConfig, + AITokenUsagePayload +} from './types/event.js'; +import type { AuthRegistry, AuthMethodName, AllCredentials } from './types/auth.js'; +import { ApiKeyAuth } from './auth/apiKeyAuth.js'; +import { ScrawnLogger } from '../utils/logger.js'; +import { matchPath } from '../utils/pathMatcher.js'; +import { forkAsyncIterable } from '../utils/forkAsyncIterable.js'; +import { EventPayloadSchema, AITokenUsagePayloadSchema } from './types/event.js'; +import { GrpcClient } from './grpc/index.js'; +import { EventService } from '../gen/event/v1/event_connect.js'; +import { EventType, SDKCallType, SDKCall, AITokenUsage } from '../gen/event/v1/event_pb.js'; +import type { StreamEventResponse } from '../gen/event/v1/event_pb.js'; +import { PaymentService } from '../gen/payment/v1/payment_connect.js'; +import { + ScrawnConfigError, + ScrawnValidationError, + convertGrpcError +} from './errors/index.js'; const log = new ScrawnLogger('Scrawn'); @@ -426,6 +429,227 @@ export class Scrawn { throw convertGrpcError(error); } - if (auth.postRun) await auth.postRun(); - } -} +if (auth.postRun) await auth.postRun(); + } + + /** + * Configuration options for aiTokenStreamConsumer. + */ + // Overload signatures for aiTokenStreamConsumer + + /** + * Stream AI token usage events to the Scrawn backend (fire-and-forget mode). + * + * Consumes an async iterable of AI token usage payloads and streams them + * to the backend for billing tracking. This is designed for real-time + * AI token tracking where usage is reported as tokens are consumed. + * + * @param stream - An async iterable of AI token usage payloads + * @returns A promise that resolves to the stream response containing processed event count + * @throws Error if authentication fails or the gRPC stream fails + */ + async aiTokenStreamConsumer( + stream: AsyncIterable + ): Promise; + + /** + * Stream AI token usage events to the Scrawn backend (fire-and-forget mode). + * + * @param stream - An async iterable of AI token usage payloads + * @param config - Configuration with return: false (or omitted) + * @returns A promise that resolves to the stream response containing processed event count + */ + async aiTokenStreamConsumer( + stream: AsyncIterable, + config: { return?: false } + ): Promise; + + /** + * Stream AI token usage events to the Scrawn backend while returning a forked stream. + * + * When `return: true`, the input stream is forked: one fork is sent to the billing + * backend (non-blocking), and the other fork is returned to the caller for streaming + * to the user. This enables simultaneous billing and user-facing token streaming. + * + * @param stream - An async iterable of AI token usage payloads + * @param config - Configuration with return: true + * @returns Object containing the response promise and a forked stream for user consumption + * + * @example + * ```typescript + * const { response, stream: userStream } = await scrawn.aiTokenStreamConsumer( + * tokenGenerator(), + * { return: true } + * ); + * + * // Stream tokens to user while billing happens in background + * for await (const token of userStream) { + * process.stdout.write(token.outputTokens.toString()); + * } + * + * // Billing completes after stream is consumed + * const result = await response; + * console.log(`Billed ${result.eventsProcessed} events`); + * ``` + */ + async aiTokenStreamConsumer( + stream: AsyncIterable, + config: { return: true } + ): Promise<{ response: Promise; stream: AsyncIterable }>; + + /** + * Stream AI token usage events to the Scrawn backend. + * + * Consumes an async iterable of AI token usage payloads and streams them + * to the backend for billing tracking. This is designed for real-time + * AI token tracking where usage is reported as tokens are consumed. + * + * The streaming is non-blocking: the iterable is consumed in the background + * and streamed to the server without blocking the caller's code path. + * + * When `return: true`, the stream is forked internally - one fork goes to + * billing (non-blocking), and another is returned to the caller for streaming + * to the user. + * + * @param stream - An async iterable of AI token usage payloads + * @param config - Optional configuration object + * @param config.return - If true, returns a forked stream alongside the response promise + * @returns Depends on config.return: + * - false/undefined: Promise + * - true: { response: Promise, stream: AsyncIterable } + * @throws Error if authentication fails or the gRPC stream fails + * + * @example + * ```typescript + * // Fire-and-forget mode (default) + * async function* tokenUsageStream() { + * yield { + * userId: 'user_abc123', + * model: 'gpt-4', + * inputTokens: 100, + * outputTokens: 50, + * inputDebit: { amount: 0.003 }, + * outputDebit: { amount: 0.006 } + * }; + * } + * + * const response = await scrawn.aiTokenStreamConsumer(tokenUsageStream()); + * console.log(`Processed ${response.eventsProcessed} events`); + * + * // Return mode - stream to user while billing + * const { response, stream } = await scrawn.aiTokenStreamConsumer( + * tokenUsageStream(), + * { return: true } + * ); + * + * for await (const token of stream) { + * // Stream to user + * } + * + * const result = await response; + * ``` + */ + async aiTokenStreamConsumer( + stream: AsyncIterable, + config?: { return?: boolean } + ): Promise; stream: AsyncIterable }> { + // Get credentials for authentication + const creds = await this.getCredsFor('api'); + + // If return mode, fork the stream + if (config?.return === true) { + const [billingStream, userStream] = forkAsyncIterable(stream); + + // Transform billing stream and send to backend (non-blocking) + const transformedStream = this.transformAITokenStream(billingStream); + + const responsePromise = (async (): Promise => { + try { + log.info('Starting AI token usage stream (return mode)'); + + const response = await this.grpcClient + .newStreamCall(EventService, 'streamEvents') + .addHeader('Authorization', `Bearer ${creds.apiKey}`) + .stream(transformedStream); + + log.info(`AI token stream completed: ${response.eventsProcessed} events processed`); + return response; + } catch (error) { + log.error(`Failed to stream AI token usage: ${error instanceof Error ? error.message : 'Unknown error'}`); + throw convertGrpcError(error); + } + })(); + + return { response: responsePromise, stream: userStream }; + } + + // Default: fire-and-forget mode + const transformedStream = this.transformAITokenStream(stream); + + try { + log.info('Starting AI token usage stream'); + + const response = await this.grpcClient + .newStreamCall(EventService, 'streamEvents') + .addHeader('Authorization', `Bearer ${creds.apiKey}`) + .stream(transformedStream); + + log.info(`AI token stream completed: ${response.eventsProcessed} events processed`); + return response; + } catch (error) { + log.error(`Failed to stream AI token usage: ${error instanceof Error ? error.message : 'Unknown error'}`); + throw convertGrpcError(error); + } + } + + /** + * Transform user-provided AI token usage payloads into StreamEventRequest format. + * + * Validates each payload and maps it to the gRPC request format. + * Invalid payloads are logged and skipped. + * + * @param stream - The user's async iterable of AITokenUsagePayload + * @returns An async iterable of StreamEventRequest payloads + * @internal + */ + private async *transformAITokenStream( + stream: AsyncIterable + ) { + for await (const payload of stream) { + // Validate each payload + const validationResult = AITokenUsagePayloadSchema.safeParse(payload); + if (!validationResult.success) { + const errors = validationResult.error.issues.map(e => `${e.path.join('.')}: ${e.message}`).join(', '); + log.error(`Invalid AI token usage payload, skipping: ${errors}`); + continue; + } + + const validated = validationResult.data; + + // Build input debit field + const inputDebit = validated.inputDebit.amount !== undefined + ? { case: 'inputAmount' as const, value: validated.inputDebit.amount } + : { case: 'inputTag' as const, value: validated.inputDebit.tag! }; + + // Build output debit field + const outputDebit = validated.outputDebit.amount !== undefined + ? { case: 'outputAmount' as const, value: validated.outputDebit.amount } + : { case: 'outputTag' as const, value: validated.outputDebit.tag! }; + + yield { + type: EventType.AI_TOKEN_USAGE, + userId: validated.userId, + data: { + case: 'aiTokenUsage' as const, + value: new AITokenUsage({ + model: validated.model, + inputTokens: validated.inputTokens, + outputTokens: validated.outputTokens, + inputDebit, + outputDebit, + }), + }, + }; + } + } +} diff --git a/packages/scrawn/src/core/types/event.ts b/packages/scrawn/src/core/types/event.ts index 04f25c4..bf7c3ca 100644 --- a/packages/scrawn/src/core/types/event.ts +++ b/packages/scrawn/src/core/types/event.ts @@ -152,11 +152,85 @@ export type PayloadExtractor = ( * }; * ``` */ -export interface MiddlewareEventConfig { - /** Function to extract event payload from request. Return null to skip tracking. */ - extractor: PayloadExtractor; - /** Optional patterns to track (exact match or wildcards: * for single segment, ** for multi-segment). Takes precedence over blacklist. */ - whitelist?: string[]; - /** Optional patterns to exclude (exact match or wildcards: * for single segment, ** for multi-segment). Only applies to endpoints not in whitelist. */ - blacklist?: string[]; -} +export interface MiddlewareEventConfig { + /** Function to extract event payload from request. Return null to skip tracking. */ + extractor: PayloadExtractor; + /** Optional patterns to track (exact match or wildcards: * for single segment, ** for multi-segment). Takes precedence over blacklist. */ + whitelist?: string[]; + /** Optional patterns to exclude (exact match or wildcards: * for single segment, ** for multi-segment). Only applies to endpoints not in whitelist. */ + blacklist?: string[]; +} + +/** + * Debit field schema for AI token usage. + * + * Represents either a direct amount or a named price tag for billing. + * Exactly one of amount or tag must be provided. + */ +const DebitFieldSchema = z.object({ + amount: z.number().nonnegative('amount must be non-negative').optional(), + tag: z.string().min(1, 'tag must be a non-empty string').optional(), +}).refine( + (data) => (data.amount !== undefined) !== (data.tag !== undefined), + { message: 'Exactly one of amount or tag must be provided' } +); + +/** + * Zod schema for AI token usage payload validation. + * + * Used by aiTokenStreamConsumer to validate each token usage event. + * + * Validates: + * - userId: non-empty string + * - model: non-empty string (e.g., 'gpt-4', 'claude-3') + * - inputTokens: non-negative integer + * - outputTokens: non-negative integer + * - inputDebit: either amount (number) OR tag (string), but not both + * - outputDebit: either amount (number) OR tag (string), but not both + */ +export const AITokenUsagePayloadSchema = z.object({ + userId: z.string().min(1, 'userId must be a non-empty string'), + model: z.string().min(1, 'model must be a non-empty string'), + inputTokens: z.number().int('inputTokens must be an integer').nonnegative('inputTokens must be non-negative'), + outputTokens: z.number().int('outputTokens must be an integer').nonnegative('outputTokens must be non-negative'), + inputDebit: DebitFieldSchema, + outputDebit: DebitFieldSchema, +}); + +/** + * Payload structure for AI token usage tracking. + * + * Used by aiTokenStreamConsumer to track AI model token consumption. + * Each payload represents a single usage event (e.g., one chunk or one request). + * + * @property userId - The user ID associated with this token usage + * @property model - The AI model identifier (e.g., 'gpt-4', 'claude-3-opus') + * @property inputTokens - Number of input/prompt tokens consumed + * @property outputTokens - Number of output/completion tokens consumed + * @property inputDebit - Billing info for input tokens (amount or tag) + * @property outputDebit - Billing info for output tokens (amount or tag) + * + * @example + * ```typescript + * // Using direct amounts + * const payload1: AITokenUsagePayload = { + * userId: 'u123', + * model: 'gpt-4', + * inputTokens: 100, + * outputTokens: 50, + * inputDebit: { amount: 0.003 }, + * outputDebit: { amount: 0.006 } + * }; + * + * // Using price tags + * const payload2: AITokenUsagePayload = { + * userId: 'u123', + * model: 'claude-3-opus', + * inputTokens: 200, + * outputTokens: 100, + * inputDebit: { tag: 'CLAUDE_INPUT' }, + * outputDebit: { tag: 'CLAUDE_OUTPUT' } + * }; + * ``` + */ +export type AITokenUsagePayload = z.infer; diff --git a/packages/scrawn/src/gen/auth/v1/auth_connect.ts b/packages/scrawn/src/gen/auth/v1/auth_connect.ts index 798e784..067f11f 100644 --- a/packages/scrawn/src/gen/auth/v1/auth_connect.ts +++ b/packages/scrawn/src/gen/auth/v1/auth_connect.ts @@ -1,4 +1,4 @@ -// @generated by protoc-gen-connect-es v1.7.0 with parameter "target=ts" +// @generated by protoc-gen-connect-es v1.6.1 with parameter "target=ts" // @generated from file auth/v1/auth.proto (package auth.v1, syntax proto3) /* eslint-disable */ // @ts-nocheck diff --git a/packages/scrawn/src/gen/auth/v1/auth_pb.ts b/packages/scrawn/src/gen/auth/v1/auth_pb.ts index 8809dcc..5b85186 100644 --- a/packages/scrawn/src/gen/auth/v1/auth_pb.ts +++ b/packages/scrawn/src/gen/auth/v1/auth_pb.ts @@ -1,4 +1,4 @@ -// @generated by protoc-gen-es v1.10.1 with parameter "target=ts" +// @generated by protoc-gen-es v1.10.0 with parameter "target=ts" // @generated from file auth/v1/auth.proto (package auth.v1, syntax proto3) /* eslint-disable */ // @ts-nocheck diff --git a/packages/scrawn/src/gen/event/v1/event_connect.ts b/packages/scrawn/src/gen/event/v1/event_connect.ts index e6aee40..234ef19 100644 --- a/packages/scrawn/src/gen/event/v1/event_connect.ts +++ b/packages/scrawn/src/gen/event/v1/event_connect.ts @@ -1,9 +1,9 @@ -// @generated by protoc-gen-connect-es v1.7.0 with parameter "target=ts" +// @generated by protoc-gen-connect-es v1.6.1 with parameter "target=ts" // @generated from file event/v1/event.proto (package event.v1, syntax proto3) /* eslint-disable */ // @ts-nocheck -import { RegisterEventRequest, RegisterEventResponse } from "./event_pb.js"; +import { RegisterEventRequest, RegisterEventResponse, StreamEventRequest, StreamEventResponse } from "./event_pb.js"; import { MethodKind } from "@bufbuild/protobuf"; /** @@ -13,7 +13,7 @@ export const EventService = { typeName: "event.v1.EventService", methods: { /** - * RegisterEvent registers an event as being done by a user + * RegisterEvent registers an event as being done by a user * * @generated from rpc event.v1.EventService.RegisterEvent */ @@ -23,6 +23,17 @@ export const EventService = { O: RegisterEventResponse, kind: MethodKind.Unary, }, + /** + * StreamEvents streams events from client to server (e.g., AI token usage) + * + * @generated from rpc event.v1.EventService.StreamEvents + */ + streamEvents: { + name: "StreamEvents", + I: StreamEventRequest, + O: StreamEventResponse, + kind: MethodKind.ClientStreaming, + }, } } as const; diff --git a/packages/scrawn/src/gen/event/v1/event_pb.ts b/packages/scrawn/src/gen/event/v1/event_pb.ts index 628ca37..4e31d58 100644 --- a/packages/scrawn/src/gen/event/v1/event_pb.ts +++ b/packages/scrawn/src/gen/event/v1/event_pb.ts @@ -1,4 +1,4 @@ -// @generated by protoc-gen-es v1.10.1 with parameter "target=ts" +// @generated by protoc-gen-es v1.10.0 with parameter "target=ts" // @generated from file event/v1/event.proto (package event.v1, syntax proto3) /* eslint-disable */ // @ts-nocheck @@ -19,11 +19,17 @@ export enum EventType { * @generated from enum value: SDK_CALL = 1; */ SDK_CALL = 1, + + /** + * @generated from enum value: AI_TOKEN_USAGE = 2; + */ + AI_TOKEN_USAGE = 2, } // Retrieve enum metadata with: proto3.getEnumType(EventType) proto3.util.setEnumType(EventType, "event.v1.EventType", [ { no: 0, name: "EVENT_TYPE_UNSPECIFIED" }, { no: 1, name: "SDK_CALL" }, + { no: 2, name: "AI_TOKEN_USAGE" }, ]); /** @@ -200,3 +206,195 @@ export class RegisterEventResponse extends Message { } } +/** + * @generated from message event.v1.StreamEventRequest + */ +export class StreamEventRequest extends Message { + /** + * @generated from field: event.v1.EventType type = 1; + */ + type = EventType.EVENT_TYPE_UNSPECIFIED; + + /** + * @generated from field: string userId = 2; + */ + userId = ""; + + /** + * @generated from oneof event.v1.StreamEventRequest.data + */ + data: { + /** + * @generated from field: event.v1.SDKCall sdkCall = 3; + */ + value: SDKCall; + case: "sdkCall"; + } | { + /** + * @generated from field: event.v1.AITokenUsage aiTokenUsage = 4; + */ + value: AITokenUsage; + case: "aiTokenUsage"; + } | { case: undefined; value?: undefined } = { case: undefined }; + + constructor(data?: PartialMessage) { + super(); + proto3.util.initPartial(data, this); + } + + static readonly runtime: typeof proto3 = proto3; + static readonly typeName = "event.v1.StreamEventRequest"; + static readonly fields: FieldList = proto3.util.newFieldList(() => [ + { no: 1, name: "type", kind: "enum", T: proto3.getEnumType(EventType) }, + { no: 2, name: "userId", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + { no: 3, name: "sdkCall", kind: "message", T: SDKCall, oneof: "data" }, + { no: 4, name: "aiTokenUsage", kind: "message", T: AITokenUsage, oneof: "data" }, + ]); + + static fromBinary(bytes: Uint8Array, options?: Partial): StreamEventRequest { + return new StreamEventRequest().fromBinary(bytes, options); + } + + static fromJson(jsonValue: JsonValue, options?: Partial): StreamEventRequest { + return new StreamEventRequest().fromJson(jsonValue, options); + } + + static fromJsonString(jsonString: string, options?: Partial): StreamEventRequest { + return new StreamEventRequest().fromJsonString(jsonString, options); + } + + static equals(a: StreamEventRequest | PlainMessage | undefined, b: StreamEventRequest | PlainMessage | undefined): boolean { + return proto3.util.equals(StreamEventRequest, a, b); + } +} + +/** + * @generated from message event.v1.AITokenUsage + */ +export class AITokenUsage extends Message { + /** + * @generated from field: string model = 1; + */ + model = ""; + + /** + * @generated from field: int32 inputTokens = 2; + */ + inputTokens = 0; + + /** + * @generated from field: int32 outputTokens = 3; + */ + outputTokens = 0; + + /** + * @generated from oneof event.v1.AITokenUsage.inputDebit + */ + inputDebit: { + /** + * @generated from field: float inputAmount = 4; + */ + value: number; + case: "inputAmount"; + } | { + /** + * @generated from field: string inputTag = 5; + */ + value: string; + case: "inputTag"; + } | { case: undefined; value?: undefined } = { case: undefined }; + + /** + * @generated from oneof event.v1.AITokenUsage.outputDebit + */ + outputDebit: { + /** + * @generated from field: float outputAmount = 6; + */ + value: number; + case: "outputAmount"; + } | { + /** + * @generated from field: string outputTag = 7; + */ + value: string; + case: "outputTag"; + } | { case: undefined; value?: undefined } = { case: undefined }; + + constructor(data?: PartialMessage) { + super(); + proto3.util.initPartial(data, this); + } + + static readonly runtime: typeof proto3 = proto3; + static readonly typeName = "event.v1.AITokenUsage"; + static readonly fields: FieldList = proto3.util.newFieldList(() => [ + { no: 1, name: "model", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + { no: 2, name: "inputTokens", kind: "scalar", T: 5 /* ScalarType.INT32 */ }, + { no: 3, name: "outputTokens", kind: "scalar", T: 5 /* ScalarType.INT32 */ }, + { no: 4, name: "inputAmount", kind: "scalar", T: 2 /* ScalarType.FLOAT */, oneof: "inputDebit" }, + { no: 5, name: "inputTag", kind: "scalar", T: 9 /* ScalarType.STRING */, oneof: "inputDebit" }, + { no: 6, name: "outputAmount", kind: "scalar", T: 2 /* ScalarType.FLOAT */, oneof: "outputDebit" }, + { no: 7, name: "outputTag", kind: "scalar", T: 9 /* ScalarType.STRING */, oneof: "outputDebit" }, + ]); + + static fromBinary(bytes: Uint8Array, options?: Partial): AITokenUsage { + return new AITokenUsage().fromBinary(bytes, options); + } + + static fromJson(jsonValue: JsonValue, options?: Partial): AITokenUsage { + return new AITokenUsage().fromJson(jsonValue, options); + } + + static fromJsonString(jsonString: string, options?: Partial): AITokenUsage { + return new AITokenUsage().fromJsonString(jsonString, options); + } + + static equals(a: AITokenUsage | PlainMessage | undefined, b: AITokenUsage | PlainMessage | undefined): boolean { + return proto3.util.equals(AITokenUsage, a, b); + } +} + +/** + * @generated from message event.v1.StreamEventResponse + */ +export class StreamEventResponse extends Message { + /** + * @generated from field: int32 eventsProcessed = 1; + */ + eventsProcessed = 0; + + /** + * @generated from field: string message = 2; + */ + message = ""; + + constructor(data?: PartialMessage) { + super(); + proto3.util.initPartial(data, this); + } + + static readonly runtime: typeof proto3 = proto3; + static readonly typeName = "event.v1.StreamEventResponse"; + static readonly fields: FieldList = proto3.util.newFieldList(() => [ + { no: 1, name: "eventsProcessed", kind: "scalar", T: 5 /* ScalarType.INT32 */ }, + { no: 2, name: "message", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + ]); + + static fromBinary(bytes: Uint8Array, options?: Partial): StreamEventResponse { + return new StreamEventResponse().fromBinary(bytes, options); + } + + static fromJson(jsonValue: JsonValue, options?: Partial): StreamEventResponse { + return new StreamEventResponse().fromJson(jsonValue, options); + } + + static fromJsonString(jsonString: string, options?: Partial): StreamEventResponse { + return new StreamEventResponse().fromJsonString(jsonString, options); + } + + static equals(a: StreamEventResponse | PlainMessage | undefined, b: StreamEventResponse | PlainMessage | undefined): boolean { + return proto3.util.equals(StreamEventResponse, a, b); + } +} + diff --git a/packages/scrawn/src/gen/payment/v1/payment_connect.ts b/packages/scrawn/src/gen/payment/v1/payment_connect.ts index ad9ebba..9ce4621 100644 --- a/packages/scrawn/src/gen/payment/v1/payment_connect.ts +++ b/packages/scrawn/src/gen/payment/v1/payment_connect.ts @@ -1,4 +1,4 @@ -// @generated by protoc-gen-connect-es v1.7.0 with parameter "target=ts" +// @generated by protoc-gen-connect-es v1.6.1 with parameter "target=ts" // @generated from file payment/v1/payment.proto (package payment.v1, syntax proto3) /* eslint-disable */ // @ts-nocheck diff --git a/packages/scrawn/src/gen/payment/v1/payment_pb.ts b/packages/scrawn/src/gen/payment/v1/payment_pb.ts index 31a6b76..69abd20 100644 --- a/packages/scrawn/src/gen/payment/v1/payment_pb.ts +++ b/packages/scrawn/src/gen/payment/v1/payment_pb.ts @@ -1,4 +1,4 @@ -// @generated by protoc-gen-es v1.10.1 with parameter "target=ts" +// @generated by protoc-gen-es v1.10.0 with parameter "target=ts" // @generated from file payment/v1/payment.proto (package payment.v1, syntax proto3) /* eslint-disable */ // @ts-nocheck diff --git a/packages/scrawn/src/utils/forkAsyncIterable.ts b/packages/scrawn/src/utils/forkAsyncIterable.ts new file mode 100644 index 0000000..604f816 --- /dev/null +++ b/packages/scrawn/src/utils/forkAsyncIterable.ts @@ -0,0 +1,83 @@ +/** + * Forks an async iterable into two independent async iterables. + * + * Both returned iterables will receive the same items from the source. + * Items are buffered internally so both consumers can read at their own pace. + * + * @param source - The source async iterable to fork + * @returns A tuple of two async iterables that both yield the same items + * + * @internal + */ +export function forkAsyncIterable( + source: AsyncIterable +): [AsyncIterable, AsyncIterable] { + const buffer1: T[] = []; + const buffer2: T[] = []; + + let resolve1: (() => void) | null = null; + let resolve2: (() => void) | null = null; + + let done = false; + let started = false; + + // Start consuming the source and push to both buffers + async function startConsuming() { + if (started) return; + started = true; + + try { + for await (const item of source) { + buffer1.push(item); + buffer2.push(item); + + // Wake up any waiting consumers + if (resolve1) { + resolve1(); + resolve1 = null; + } + if (resolve2) { + resolve2(); + resolve2 = null; + } + } + } finally { + done = true; + // Wake up consumers so they can finish + if (resolve1) { + resolve1(); + resolve1 = null; + } + if (resolve2) { + resolve2(); + resolve2 = null; + } + } + } + + async function* createIterator( + buffer: T[], + setResolve: (r: (() => void) | null) => void + ): AsyncGenerator { + // Kick off consuming (idempotent) + startConsuming(); + + while (true) { + if (buffer.length > 0) { + yield buffer.shift()!; + } else if (done) { + return; + } else { + // Wait for new item or done + await new Promise((resolve) => { + setResolve(resolve); + }); + } + } + } + + const iter1 = createIterator(buffer1, (r) => { resolve1 = r; }); + const iter2 = createIterator(buffer2, (r) => { resolve2 = r; }); + + return [iter1, iter2]; +} diff --git a/packages/scrawn/tests/unit/types/aiTokenUsagePayload.test.ts b/packages/scrawn/tests/unit/types/aiTokenUsagePayload.test.ts new file mode 100644 index 0000000..fc61584 --- /dev/null +++ b/packages/scrawn/tests/unit/types/aiTokenUsagePayload.test.ts @@ -0,0 +1,212 @@ +import { describe, expect, it } from "vitest"; +import { AITokenUsagePayloadSchema } from "../../../src/core/types/event.js"; + +describe("AITokenUsagePayloadSchema", () => { + describe("valid payloads", () => { + it("accepts payloads with amount-based debits", () => { + const result = AITokenUsagePayloadSchema.safeParse({ + userId: "user_1", + model: "gpt-4", + inputTokens: 100, + outputTokens: 50, + inputDebit: { amount: 0.003 }, + outputDebit: { amount: 0.006 }, + }); + + expect(result.success).toBe(true); + }); + + it("accepts payloads with tag-based debits", () => { + const result = AITokenUsagePayloadSchema.safeParse({ + userId: "user_1", + model: "claude-3-opus", + inputTokens: 200, + outputTokens: 100, + inputDebit: { tag: "CLAUDE_INPUT" }, + outputDebit: { tag: "CLAUDE_OUTPUT" }, + }); + + expect(result.success).toBe(true); + }); + + it("accepts payloads with mixed debit types", () => { + const result = AITokenUsagePayloadSchema.safeParse({ + userId: "user_1", + model: "gpt-4", + inputTokens: 100, + outputTokens: 50, + inputDebit: { amount: 0.003 }, + outputDebit: { tag: "OUTPUT_TAG" }, + }); + + expect(result.success).toBe(true); + }); + + it("accepts payloads with zero tokens", () => { + const result = AITokenUsagePayloadSchema.safeParse({ + userId: "user_1", + model: "gpt-4", + inputTokens: 0, + outputTokens: 0, + inputDebit: { amount: 0 }, + outputDebit: { amount: 0 }, + }); + + expect(result.success).toBe(true); + }); + }); + + describe("invalid payloads", () => { + it("rejects payloads with empty userId", () => { + const result = AITokenUsagePayloadSchema.safeParse({ + userId: "", + model: "gpt-4", + inputTokens: 100, + outputTokens: 50, + inputDebit: { amount: 0.003 }, + outputDebit: { amount: 0.006 }, + }); + + expect(result.success).toBe(false); + }); + + it("rejects payloads with empty model", () => { + const result = AITokenUsagePayloadSchema.safeParse({ + userId: "user_1", + model: "", + inputTokens: 100, + outputTokens: 50, + inputDebit: { amount: 0.003 }, + outputDebit: { amount: 0.006 }, + }); + + expect(result.success).toBe(false); + }); + + it("rejects payloads with negative inputTokens", () => { + const result = AITokenUsagePayloadSchema.safeParse({ + userId: "user_1", + model: "gpt-4", + inputTokens: -10, + outputTokens: 50, + inputDebit: { amount: 0.003 }, + outputDebit: { amount: 0.006 }, + }); + + expect(result.success).toBe(false); + }); + + it("rejects payloads with negative outputTokens", () => { + const result = AITokenUsagePayloadSchema.safeParse({ + userId: "user_1", + model: "gpt-4", + inputTokens: 100, + outputTokens: -5, + inputDebit: { amount: 0.003 }, + outputDebit: { amount: 0.006 }, + }); + + expect(result.success).toBe(false); + }); + + it("rejects payloads with non-integer tokens", () => { + const result = AITokenUsagePayloadSchema.safeParse({ + userId: "user_1", + model: "gpt-4", + inputTokens: 100.5, + outputTokens: 50, + inputDebit: { amount: 0.003 }, + outputDebit: { amount: 0.006 }, + }); + + expect(result.success).toBe(false); + }); + + it("rejects payloads with both amount and tag in inputDebit", () => { + const result = AITokenUsagePayloadSchema.safeParse({ + userId: "user_1", + model: "gpt-4", + inputTokens: 100, + outputTokens: 50, + inputDebit: { amount: 0.003, tag: "INPUT_TAG" }, + outputDebit: { amount: 0.006 }, + }); + + expect(result.success).toBe(false); + }); + + it("rejects payloads with both amount and tag in outputDebit", () => { + const result = AITokenUsagePayloadSchema.safeParse({ + userId: "user_1", + model: "gpt-4", + inputTokens: 100, + outputTokens: 50, + inputDebit: { amount: 0.003 }, + outputDebit: { amount: 0.006, tag: "OUTPUT_TAG" }, + }); + + expect(result.success).toBe(false); + }); + + it("rejects payloads with empty inputDebit", () => { + const result = AITokenUsagePayloadSchema.safeParse({ + userId: "user_1", + model: "gpt-4", + inputTokens: 100, + outputTokens: 50, + inputDebit: {}, + outputDebit: { amount: 0.006 }, + }); + + expect(result.success).toBe(false); + }); + + it("rejects payloads with empty outputDebit", () => { + const result = AITokenUsagePayloadSchema.safeParse({ + userId: "user_1", + model: "gpt-4", + inputTokens: 100, + outputTokens: 50, + inputDebit: { amount: 0.003 }, + outputDebit: {}, + }); + + expect(result.success).toBe(false); + }); + + it("rejects payloads with negative debit amount", () => { + const result = AITokenUsagePayloadSchema.safeParse({ + userId: "user_1", + model: "gpt-4", + inputTokens: 100, + outputTokens: 50, + inputDebit: { amount: -0.003 }, + outputDebit: { amount: 0.006 }, + }); + + expect(result.success).toBe(false); + }); + + it("rejects payloads with empty debit tag", () => { + const result = AITokenUsagePayloadSchema.safeParse({ + userId: "user_1", + model: "gpt-4", + inputTokens: 100, + outputTokens: 50, + inputDebit: { tag: "" }, + outputDebit: { amount: 0.006 }, + }); + + expect(result.success).toBe(false); + }); + + it("rejects payloads missing required fields", () => { + const result = AITokenUsagePayloadSchema.safeParse({ + userId: "user_1", + model: "gpt-4", + }); + + expect(result.success).toBe(false); + }); + }); +}); diff --git a/packages/scrawn/tests/unit/utils/forkAsyncIterable.test.ts b/packages/scrawn/tests/unit/utils/forkAsyncIterable.test.ts new file mode 100644 index 0000000..b088c82 --- /dev/null +++ b/packages/scrawn/tests/unit/utils/forkAsyncIterable.test.ts @@ -0,0 +1,208 @@ +import { describe, expect, it } from "vitest"; +import { forkAsyncIterable } from "../../../src/utils/forkAsyncIterable.js"; + +describe("forkAsyncIterable", () => { + it("both iterables receive all items", async () => { + async function* source() { + yield 1; + yield 2; + yield 3; + } + + const [iter1, iter2] = forkAsyncIterable(source()); + + const results1: number[] = []; + const results2: number[] = []; + + for await (const item of iter1) { + results1.push(item); + } + + for await (const item of iter2) { + results2.push(item); + } + + expect(results1).toEqual([1, 2, 3]); + expect(results2).toEqual([1, 2, 3]); + }); + + it("iterables can be consumed concurrently", async () => { + async function* source() { + yield "a"; + yield "b"; + yield "c"; + } + + const [iter1, iter2] = forkAsyncIterable(source()); + + const [results1, results2] = await Promise.all([ + (async () => { + const items: string[] = []; + for await (const item of iter1) { + items.push(item); + } + return items; + })(), + (async () => { + const items: string[] = []; + for await (const item of iter2) { + items.push(item); + } + return items; + })(), + ]); + + expect(results1).toEqual(["a", "b", "c"]); + expect(results2).toEqual(["a", "b", "c"]); + }); + + it("handles empty source", async () => { + async function* source(): AsyncGenerator { + // Empty generator + } + + const [iter1, iter2] = forkAsyncIterable(source()); + + const results1: number[] = []; + const results2: number[] = []; + + for await (const item of iter1) { + results1.push(item); + } + + for await (const item of iter2) { + results2.push(item); + } + + expect(results1).toEqual([]); + expect(results2).toEqual([]); + }); + + it("handles single item", async () => { + async function* source() { + yield "only"; + } + + const [iter1, iter2] = forkAsyncIterable(source()); + + const results1: string[] = []; + const results2: string[] = []; + + for await (const item of iter1) { + results1.push(item); + } + + for await (const item of iter2) { + results2.push(item); + } + + expect(results1).toEqual(["only"]); + expect(results2).toEqual(["only"]); + }); + + it("one slow consumer does not block the other", async () => { + async function* source() { + yield 1; + yield 2; + yield 3; + } + + const [iter1, iter2] = forkAsyncIterable(source()); + + const consumptionOrder: string[] = []; + + await Promise.all([ + (async () => { + for await (const item of iter1) { + consumptionOrder.push(`fast-${item}`); + } + })(), + (async () => { + for await (const item of iter2) { + // Slow consumer - waits a bit between items + await new Promise((r) => setTimeout(r, 10)); + consumptionOrder.push(`slow-${item}`); + } + })(), + ]); + + // Fast consumer should finish its items before slow consumer + // The exact order depends on timing, but both should get all items + expect(consumptionOrder.filter((x) => x.startsWith("fast-"))).toEqual([ + "fast-1", + "fast-2", + "fast-3", + ]); + expect(consumptionOrder.filter((x) => x.startsWith("slow-"))).toEqual([ + "slow-1", + "slow-2", + "slow-3", + ]); + }); + + it("works with complex objects", async () => { + interface TokenEvent { + userId: string; + tokens: number; + } + + async function* source(): AsyncGenerator { + yield { userId: "u1", tokens: 100 }; + yield { userId: "u2", tokens: 200 }; + } + + const [iter1, iter2] = forkAsyncIterable(source()); + + const results1: TokenEvent[] = []; + const results2: TokenEvent[] = []; + + for await (const item of iter1) { + results1.push(item); + } + + for await (const item of iter2) { + results2.push(item); + } + + expect(results1).toEqual([ + { userId: "u1", tokens: 100 }, + { userId: "u2", tokens: 200 }, + ]); + expect(results2).toEqual([ + { userId: "u1", tokens: 100 }, + { userId: "u2", tokens: 200 }, + ]); + }); + + it("handles delayed source items", async () => { + async function* source() { + yield 1; + await new Promise((r) => setTimeout(r, 10)); + yield 2; + await new Promise((r) => setTimeout(r, 10)); + yield 3; + } + + const [iter1, iter2] = forkAsyncIterable(source()); + + const [results1, results2] = await Promise.all([ + (async () => { + const items: number[] = []; + for await (const item of iter1) { + items.push(item); + } + return items; + })(), + (async () => { + const items: number[] = []; + for await (const item of iter2) { + items.push(item); + } + return items; + })(), + ]); + + expect(results1).toEqual([1, 2, 3]); + expect(results2).toEqual([1, 2, 3]); + }); +}); From c6ddf7c58dd5d53731539f066656a434ddb36523 Mon Sep 17 00:00:00 2001 From: Jayadeep Bejoy <88953813+SteakFisher@users.noreply.github.com> Date: Fri, 23 Jan 2026 12:57:06 +0530 Subject: [PATCH 4/4] Remove as any Signed-off-by: Jayadeep Bejoy <88953813+SteakFisher@users.noreply.github.com> --- packages/scrawn/src/core/grpc/streamRequestBuilder.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/scrawn/src/core/grpc/streamRequestBuilder.ts b/packages/scrawn/src/core/grpc/streamRequestBuilder.ts index df3cebc..c5d6d47 100644 --- a/packages/scrawn/src/core/grpc/streamRequestBuilder.ts +++ b/packages/scrawn/src/core/grpc/streamRequestBuilder.ts @@ -114,7 +114,7 @@ export class StreamRequestBuilder< log.debug(`Headers: ${JSON.stringify(this.headers)}`); // The actual client-streaming gRPC call - const response = await (this.client[this.methodName] as any)( + const response = await (this.client[this.methodName])( iterable, { headers: this.headers } );