Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions examples/ai-token-stream-usage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import { Scrawn, type AITokenUsagePayload } from '@scrawn/core';
import { config } from 'dotenv';
config({ path: '.env.local' });

const scrawn = new Scrawn({
apiKey: (process.env.SCRAWN_KEY || '') as `scrn_${string}`,
baseURL: process.env.SCRAWN_BASE_URL || 'http://localhost:8069',
});

// Simulate what your AI provider wrapper would do:
// As tokens stream from OpenAI/Anthropic/etc, you yield usage events
async function* tokenUsageFromAIStream(): AsyncGenerator<AITokenUsagePayload> {
const userId = 'c0971bcb-b901-4c3e-a191-c9a97871c39f';

// Initial prompt tokens
yield {
userId,
model: 'gpt-4',
inputTokens: 150,
outputTokens: 0,
inputDebit: { amount: 0.0045 },
outputDebit: { amount: 0 },
};

// Output tokens as they stream
yield {
userId,
model: 'gpt-4',
inputTokens: 0,
outputTokens: 75,
inputDebit: { amount: 0 },
outputDebit: { amount: 0.0045 },
};
}

// Example 1: Fire-and-forget mode (default)
// The stream is consumed and sent to backend, you just await the final response
async function fireAndForgetExample() {
console.log('--- Fire-and-forget mode ---');

const response = await scrawn.aiTokenStreamConsumer(tokenUsageFromAIStream());

console.log(`Streamed ${response.eventsProcessed} token usage events`);
}

// Example 2: Return mode
// The stream is forked - one fork goes to billing (non-blocking),
// the other is returned to you for streaming to the user
async function returnModeExample() {
console.log('\n--- Return mode (with stream passthrough) ---');

const { response, stream } = await scrawn.aiTokenStreamConsumer(
tokenUsageFromAIStream(),
{ return: true }
);

// Stream tokens to user while billing happens in background
console.log('Streaming tokens to user:');
for await (const token of stream) {
console.log(` -> ${token.model}: input=${token.inputTokens}, output=${token.outputTokens}`);
}

// Billing completes after stream is consumed
const result = await response;
console.log(`Billing complete: ${result.eventsProcessed} events processed`);
}

async function main() {
await fireAndForgetExample();
await returnModeExample();
}

main().catch(console.error);
16 changes: 8 additions & 8 deletions packages/scrawn/proto/buf.gen.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
version: v1
plugins:
- plugin: es
out: ../src/gen
opt: target=ts
- plugin: connect-es
out: ../src/gen
opt: target=ts
version: v1
plugins:
- plugin: buf.build/bufbuild/es:v1.10.0
out: ../src/gen
opt: target=ts
- plugin: buf.build/connectrpc/es:v1.6.1
out: ../src/gen
opt: target=ts
114 changes: 74 additions & 40 deletions packages/scrawn/proto/event/v1/event.proto
Original file line number Diff line number Diff line change
@@ -1,40 +1,74 @@
syntax = "proto3";

package event.v1;

service EventService {
// RegisterEvent registers an event as being done by a user
rpc RegisterEvent(RegisterEventRequest) returns (RegisterEventResponse) {}
}

enum EventType {
EVENT_TYPE_UNSPECIFIED = 0;
SDK_CALL = 1;
}

enum SDKCallType {
SDKCallType_UNSPECIFIED = 0;
RAW = 1;
MIDDLEWARE_CALL = 2;
}

message RegisterEventRequest {
EventType type = 1;
string userId = 2;
oneof data {
SDKCall sdkCall = 3;
}
}

message SDKCall {
SDKCallType sdkCallType = 1;

oneof debit {
float amount = 2;
string tag = 3;
}
}

message RegisterEventResponse {
string random = 1;
}
syntax = "proto3";

package event.v1;

service EventService {
// RegisterEvent registers an event as being done by a user
rpc RegisterEvent(RegisterEventRequest) returns (RegisterEventResponse) {}

// StreamEvents streams events from client to server (e.g., AI token usage)
rpc StreamEvents(stream StreamEventRequest) returns (StreamEventResponse) {}
}

enum EventType {
EVENT_TYPE_UNSPECIFIED = 0;
SDK_CALL = 1;
AI_TOKEN_USAGE = 2;
}

enum SDKCallType {
SDKCallType_UNSPECIFIED = 0;
RAW = 1;
MIDDLEWARE_CALL = 2;
}

message RegisterEventRequest {
EventType type = 1;
string userId = 2;
oneof data {
SDKCall sdkCall = 3;
}
}

message SDKCall {
SDKCallType sdkCallType = 1;

oneof debit {
float amount = 2;
string tag = 3;
}
}

message RegisterEventResponse {
string random = 1;
}

message StreamEventRequest {
EventType type = 1;
string userId = 2;
oneof data {
SDKCall sdkCall = 3;
AITokenUsage aiTokenUsage = 4;
}
}

message AITokenUsage {
string model = 1;
int32 inputTokens = 2;
int32 outputTokens = 3;

oneof inputDebit {
float inputAmount = 4;
string inputTag = 5;
}

oneof outputDebit {
float outputAmount = 6;
string outputTag = 7;
}
}

message StreamEventResponse {
int32 eventsProcessed = 1;
string message = 2;
}
2 changes: 1 addition & 1 deletion packages/scrawn/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export const ScrawnConfig = {
* - Binary framing
* - Built-in connection keep-alive
*/
httpVersion: '2' as const,
httpVersion: '1.1' as const,

/**
* Enable gzip compression for request/response bodies.
Expand Down
69 changes: 52 additions & 17 deletions packages/scrawn/src/core/grpc/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@
* ```
*/

import type { Transport } from '@connectrpc/connect';
import type { ServiceType } from '@bufbuild/protobuf';
import { createConnectTransport } from '@connectrpc/connect-node';
import { RequestBuilder } from './requestBuilder.js';
import { ScrawnLogger } from '../../utils/logger.js';
import type { ServiceMethodNames } from './types.js';
import { ScrawnConfig } from '../../config.js';
import type { Transport } from '@connectrpc/connect';
import type { ServiceType } from '@bufbuild/protobuf';
import { createConnectTransport } from '@connectrpc/connect-node';
import { RequestBuilder } from './requestBuilder.js';
import { StreamRequestBuilder } from './streamRequestBuilder.js';
import { ScrawnLogger } from '../../utils/logger.js';
import type { ServiceMethodNames } from './types.js';
import { ScrawnConfig } from '../../config.js';

const log = new ScrawnLogger('GrpcClient');

Expand Down Expand Up @@ -147,13 +148,47 @@ export class GrpcClient {
return this.baseURL;
}

/**
* Get the underlying transport (for advanced use cases).
*
* @returns The Connect transport instance
* @internal
*/
getTransport(): Transport {
return this.transport;
}
}
/**
* Get the underlying transport (for advanced use cases).
*
* @returns The Connect transport instance
* @internal
*/
getTransport(): Transport {
return this.transport;
}

/**
* Create a new stream request builder for a client-streaming service method.
*
* This is the entry point for making client-streaming gRPC calls. The method is fully type-safe:
* - Service parameter must be a valid gRPC service
* - Method name must exist on the service (autocomplete provided)
* - Payload type is inferred from the method
* - Response type is inferred from the method
*
* @template S - The gRPC service type (auto-inferred)
* @template M - The method name (auto-inferred and validated)
*
* @param service - The gRPC service definition (e.g., EventService)
* @param method - The method name as a string literal (e.g., 'streamEvents')
* @returns A new StreamRequestBuilder for chaining headers and streaming payloads
*
* @example
* ```typescript
* // EventService.streamEvents (client-streaming)
* const response = await client
* .newStreamCall(EventService, 'streamEvents')
* .addHeader('Authorization', `Bearer ${apiKey}`)
* .stream(asyncIterableOfEvents);
* // Response type is StreamEventResponse
* ```
*/
newStreamCall<S extends ServiceType, M extends ServiceMethodNames<S>>(
service: S,
method: M
): StreamRequestBuilder<S, M> {
log.debug(`Creating new stream request builder for ${service.typeName}.${String(method)}`);
return new StreamRequestBuilder<S, M>(this.transport, service, method);
}
}
37 changes: 19 additions & 18 deletions packages/scrawn/src/core/grpc/index.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
/**
* gRPC abstraction layer - Type-safe fluent API for gRPC calls.
*
* This module provides a beautiful, type-safe interface for making gRPC calls
* with automatic type inference, compile-time validation, and a fluent API.
*
* @module grpc
*/

export { GrpcClient } from './client.js';
export { RequestBuilder } from './requestBuilder.js';
export type {
ServiceMethodNames,
MethodInput,
MethodOutput,
Headers,
RequestState,
} from './types.js';
/**
* gRPC abstraction layer - Type-safe fluent API for gRPC calls.
*
* This module provides a beautiful, type-safe interface for making gRPC calls
* with automatic type inference, compile-time validation, and a fluent API.
*
* @module grpc
*/

export { GrpcClient } from './client.js';
export { RequestBuilder } from './requestBuilder.js';
export { StreamRequestBuilder } from './streamRequestBuilder.js';
export type {
ServiceMethodNames,
MethodInput,
MethodOutput,
Headers,
RequestState,
} from './types.js';
Loading
Loading