diff --git a/.changeset/swift-pandas-sing.md b/.changeset/swift-pandas-sing.md new file mode 100644 index 0000000..1e963f5 --- /dev/null +++ b/.changeset/swift-pandas-sing.md @@ -0,0 +1,5 @@ +--- +"ff-effect": patch +--- + +Add Effect wrappers for AI SDK (generateText, streamText, tool) diff --git a/bun.lock b/bun.lock index 4e8a982..8d4f950 100644 --- a/bun.lock +++ b/bun.lock @@ -52,6 +52,7 @@ "@total-typescript/tsconfig": "^1.0.4", "@types/bun": "^1.3.2", "@typescript/native-preview": "^7.0.0-dev.20260122.4", + "ai": "^6.0.102", "inngest": "^3.52.3", "tsup": "^8.5.0", "typescript": "^5.9.3", @@ -96,7 +97,7 @@ }, "packages/serv": { "name": "ff-serv", - "version": "0.1.10", + "version": "0.1.11", "bin": { "ff-serv": "./dist/cli.js", }, @@ -1655,6 +1656,8 @@ "ff-effect/@effect/platform": ["@effect/platform@0.94.2", "", { "dependencies": { "find-my-way-ts": "^0.1.6", "msgpackr": "^1.11.4", "multipasta": "^0.2.7" }, "peerDependencies": { "effect": "^3.19.15" } }, "sha512-85vdwpnK4oH/rJ3EuX/Gi2Hkt+K4HvXWr9bxCuqvty9hxyEcRxkJcqTesYrcVoQB6aULb1Za2B0MKoTbvffB3Q=="], + "ff-effect/ai": ["ai@6.0.102", "", { "dependencies": { "@ai-sdk/gateway": "3.0.56", "@ai-sdk/provider": "3.0.8", "@ai-sdk/provider-utils": "4.0.15", "@opentelemetry/api": "1.9.0" }, "peerDependencies": { "zod": "^3.25.76 || ^4.1.8" } }, "sha512-oO0hFDSiMwqFONv+LwkuvqFHvR6nRiy/T9ndMkWxxc5KwXNim+DIqqfWLWRm6y1Qm7f4jHICatci123jkDjlAg=="], + "ff-serv/@effect/platform": ["@effect/platform@0.94.2", "", { "dependencies": { "find-my-way-ts": "^0.1.6", "msgpackr": "^1.11.4", "multipasta": "^0.2.7" }, "peerDependencies": { "effect": "^3.19.15" } }, "sha512-85vdwpnK4oH/rJ3EuX/Gi2Hkt+K4HvXWr9bxCuqvty9hxyEcRxkJcqTesYrcVoQB6aULb1Za2B0MKoTbvffB3Q=="], "inngest/@opentelemetry/resources": ["@opentelemetry/resources@2.4.0", "", { "dependencies": { "@opentelemetry/core": "2.4.0", "@opentelemetry/semantic-conventions": "^1.29.0" }, "peerDependencies": { "@opentelemetry/api": ">=1.3.0 <1.10.0" } }, "sha512-RWvGLj2lMDZd7M/5tjkI/2VHMpXebLgPKvBUd9LRasEWR2xAynDwEYZuLvY9P2NGG73HF07jbbgWX2C9oavcQg=="], @@ -1849,6 +1852,12 @@ "enquirer/strip-ansi/ansi-regex": ["ansi-regex@5.0.1", "", {}, "sha512-quJQXlTSUGL2LH9SUXo8VwsY4soanhgo6LNSm84E1LBcE8s3O0wpdiRzyR9z/ZZJMlMWv37qOOb9pdJlMUEKFQ=="], + "ff-effect/ai/@ai-sdk/gateway": ["@ai-sdk/gateway@3.0.56", "", { "dependencies": { "@ai-sdk/provider": "3.0.8", "@ai-sdk/provider-utils": "4.0.15", "@vercel/oidc": "3.1.0" }, "peerDependencies": { "zod": "^3.25.76 || ^4.1.8" } }, "sha512-8afq9Oyh5f6LKzER6ayuymnX0NUpeYG7HFLHFHBYqC9n1zxBdv5bYqattOX6ajb2Yn7LfWFbEs1sjrDEJys6KQ=="], + + "ff-effect/ai/@ai-sdk/provider": ["@ai-sdk/provider@3.0.8", "", { "dependencies": { "json-schema": "^0.4.0" } }, "sha512-oGMAgGoQdBXbZqNG0Ze56CHjDZ1IDYOwGYxYjO5KLSlz5HiNQ9udIXsPZ61VWaHGZ5XW/jyjmr6t2xz2jGVwbQ=="], + + "ff-effect/ai/@ai-sdk/provider-utils": ["@ai-sdk/provider-utils@4.0.15", "", { "dependencies": { "@ai-sdk/provider": "3.0.8", "@standard-schema/spec": "^1.1.0", "eventsource-parser": "^3.0.6" }, "peerDependencies": { "zod": "^3.25.76 || ^4.1.8" } }, "sha512-8XiKWbemmCbvNN0CLR9u3PQiet4gtEVIrX4zzLxnCj06AwsEDJwJVBbKrEI4t6qE8XRSIvU2irka0dcpziKW6w=="], + "inngest/@opentelemetry/resources/@opentelemetry/core": ["@opentelemetry/core@2.4.0", "", { "dependencies": { "@opentelemetry/semantic-conventions": "^1.29.0" }, "peerDependencies": { "@opentelemetry/api": ">=1.0.0 <1.10.0" } }, "sha512-KtcyFHssTn5ZgDu6SXmUznS80OFs/wN7y6MyFRRcKU6TOw8hNcGxKvt8hsdaLJfhzUszNSjURetq5Qpkad14Gw=="], "read-yaml-file/js-yaml/argparse": ["argparse@1.0.10", "", { "dependencies": { "sprintf-js": "~1.0.2" } }, "sha512-o5Roy6tNG4SL/FOkCAN6RzjiakZS25RLYFrcMttJqbdd8BWrnA+fGz57iN5Pb06pvBGvl5gQ0B48dJlslXvoTg=="], diff --git a/packages/effect/docs/for/ai.md b/packages/effect/docs/for/ai.md new file mode 100644 index 0000000..0712d12 --- /dev/null +++ b/packages/effect/docs/for/ai.md @@ -0,0 +1,130 @@ +# ff-effect/for/ai + +Effect wrappers for AI SDK's `generateText`, `streamText`, and `tool`. + +## Basic `generateText` + +```ts +import { generateText } from "ff-effect/for/ai" +import { openai } from "@ai-sdk/openai" +import { Effect } from "effect" + +const program = generateText({ + model: openai("gpt-4o"), + prompt: "What is the capital of France?", +}) + +const result = await Effect.runPromise(program) +console.log(result.text) +``` + +## `generateText` with `onFinish` callback + +```ts +import { generateText } from "ff-effect/for/ai" +import { openai } from "@ai-sdk/openai" +import { Effect } from "effect" + +const program = generateText({ + model: openai("gpt-4o"), + prompt: "Summarize the water cycle.", + onFinish: (result) => + Effect.sync(() => { + console.log("Finished, tokens used:", result.usage.totalTokens) + }), +}) + +const result = await Effect.runPromise(program) +console.log(result.text) +``` + +## Basic `streamText` + +Requires `Effect.scoped` because streaming outlives the initial call. + +```ts +import { streamText } from "ff-effect/for/ai" +import { openai } from "@ai-sdk/openai" +import { Effect } from "effect" + +const program = Effect.scoped( + Effect.gen(function* () { + const result = yield* streamText({ + model: openai("gpt-4o"), + prompt: "Tell me a short story.", + }) + + for await (const chunk of result.textStream) { + process.stdout.write(chunk) + } + }) +) + +await Effect.runPromise(program) +``` + +## `tool` with an Effect service + +`tool()` returns an `Effect.Effect`, so it must be yielded inside `Effect.gen` with `Effect.scoped`. + +```ts +import { generateText, tool } from "ff-effect/for/ai" +import { openai } from "@ai-sdk/openai" +import { Effect } from "effect" +import { z } from "zod" + +class WeatherService extends Effect.Service()("WeatherService", { + succeed: { + getTemperature: (city: string) => Effect.succeed(22), + }, +}) {} + +const program = Effect.scoped( + Effect.gen(function* () { + const weatherTool = yield* tool({ + description: "Get the current temperature for a city", + parameters: z.object({ city: z.string() }), + execute: ({ city }) => + Effect.gen(function* () { + const weather = yield* WeatherService + const temperature = yield* weather.getTemperature(city) + return { city, temperature } + }), + }) + + const result = yield* generateText({ + model: openai("gpt-4o"), + prompt: "What is the temperature in Paris?", + tools: { weather: weatherTool }, + }) + + return result.text + }) +) + +const text = await Effect.runPromise(program.pipe(Effect.provide(WeatherService.Default))) +console.log(text) +``` + +## Error handling with `AiError` + +```ts +import { generateText, AiError } from "ff-effect/for/ai" +import { openai } from "@ai-sdk/openai" +import { Effect } from "effect" + +const program = generateText({ + model: openai("gpt-4o"), + prompt: "Hello!", +}).pipe( + Effect.catchTag("ff-effect/AiError", (error) => + Effect.sync(() => { + console.error("AI SDK error:", error.cause) + return { text: "Fallback response" } + }) + ) +) + +const result = await Effect.runPromise(program) +console.log(result.text) +``` diff --git a/packages/effect/package.json b/packages/effect/package.json index cf9ae85..312b164 100644 --- a/packages/effect/package.json +++ b/packages/effect/package.json @@ -30,6 +30,7 @@ "@total-typescript/tsconfig": "^1.0.4", "@types/bun": "^1.3.2", "@typescript/native-preview": "^7.0.0-dev.20260122.4", + "ai": "^6.0.102", "inngest": "^3.52.3", "tsup": "^8.5.0", "typescript": "^5.9.3", @@ -37,11 +38,15 @@ "vitest": "^4.0.10" }, "peerDependencies": { + "ai": "^6", "drizzle-orm": "^0.44.7", "effect": "^3.19.3", "inngest": "^3" }, "peerDependenciesMeta": { + "ai": { + "optional": true + }, "drizzle-orm": { "optional": true }, diff --git a/packages/effect/src/for/ai/index.test.ts b/packages/effect/src/for/ai/index.test.ts new file mode 100644 index 0000000..d0e3ca4 --- /dev/null +++ b/packages/effect/src/for/ai/index.test.ts @@ -0,0 +1,244 @@ +import * as Ai from 'ai'; +import { Effect } from 'effect'; +import { describe, expect, test, vi } from 'vitest'; +import { AiError, generateText, streamText, tool } from './index.js'; + +vi.mock('ai', async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + generateText: vi.fn(), + streamText: vi.fn(), + }; +}); + +describe('generateText', () => { + test('wraps promise and returns result', () => { + const mockResult = { text: 'hello', finishReason: 'stop' }; + vi.mocked(Ai.generateText).mockResolvedValue(mockResult as never); + + return Effect.gen(function* () { + const result = yield* generateText({ + model: {} as Ai.LanguageModel, + prompt: 'test', + }); + + expect(result).toBe(mockResult); + expect(Ai.generateText).toHaveBeenCalledWith( + expect.objectContaining({ prompt: 'test' }), + ); + }).pipe(Effect.runPromise); + }); + + test('surfaces errors as AiError', () => + Effect.gen(function* () { + vi.mocked(Ai.generateText).mockRejectedValue(new Error('API error')); + + const result = yield* generateText({ + model: {} as Ai.LanguageModel, + prompt: 'test', + }).pipe(Effect.flip); + + expect(result).toBeInstanceOf(AiError); + expect(result.message).toBe('generateText failed'); + expect(result.cause).toBeInstanceOf(Error); + expect((result.cause as Error).message).toBe('API error'); + }).pipe(Effect.runPromise)); + + test('wraps onStepFinish callback', () => { + const stepResult = { text: 'step1' }; + vi.mocked(Ai.generateText).mockImplementation( + // biome-ignore lint/suspicious/noExplicitAny: test mock + (async (params: any) => { + if (params.onStepFinish) await params.onStepFinish(stepResult); + return { text: 'done' }; + }) as never, + ); + + const onStepFinishSpy = vi.fn(() => Effect.void); + + return Effect.gen(function* () { + yield* generateText({ + model: {} as Ai.LanguageModel, + prompt: 'test', + onStepFinish: onStepFinishSpy, + }); + + expect(onStepFinishSpy).toHaveBeenCalledWith(stepResult); + }).pipe(Effect.runPromise); + }); + + test('wraps onFinish callback', () => { + const finishEvent = { text: 'done', steps: [], totalUsage: {} }; + vi.mocked(Ai.generateText).mockImplementation( + // biome-ignore lint/suspicious/noExplicitAny: test mock + (async (params: any) => { + if (params.onFinish) await params.onFinish(finishEvent); + return { text: 'done' }; + }) as never, + ); + + const onFinishSpy = vi.fn(() => Effect.void); + + return Effect.gen(function* () { + yield* generateText({ + model: {} as Ai.LanguageModel, + prompt: 'test', + onFinish: onFinishSpy, + }); + + expect(onFinishSpy).toHaveBeenCalledWith(finishEvent); + }).pipe(Effect.runPromise); + }); +}); + +describe('streamText', () => { + test('wraps synchronous return', () => { + const mockResult = { textStream: 'mock-stream' }; + vi.mocked(Ai.streamText).mockReturnValue(mockResult as never); + + return Effect.gen(function* () { + const result = yield* streamText({ + model: {} as Ai.LanguageModel, + prompt: 'test', + }); + + expect(result).toBe(mockResult); + expect(Ai.streamText).toHaveBeenCalledWith( + expect.objectContaining({ prompt: 'test' }), + ); + }).pipe(Effect.scoped, Effect.runPromise); + }); + + test('surfaces thrown errors as AiError', () => { + vi.mocked(Ai.streamText).mockImplementation(() => { + throw new Error('stream error'); + }); + + return Effect.gen(function* () { + const result = yield* streamText({ + model: {} as Ai.LanguageModel, + prompt: 'test', + }).pipe(Effect.flip); + + expect(result).toBeInstanceOf(AiError); + expect(result.message).toBe('streamText failed'); + expect(result.cause).toBeInstanceOf(Error); + expect((result.cause as Error).message).toBe('stream error'); + }).pipe(Effect.scoped, Effect.runPromise); + }); + + test('wraps onFinish callback', () => { + const finishEvent = { text: 'done', steps: [], totalUsage: {} }; + vi.mocked(Ai.streamText).mockImplementation( + // biome-ignore lint/suspicious/noExplicitAny: test mock + ((params: any) => { + if (params.onFinish) params.onFinish(finishEvent); + return { textStream: 'mock' }; + }) as never, + ); + + const onFinishSpy = vi.fn(() => Effect.void); + + return Effect.gen(function* () { + yield* streamText({ + model: {} as Ai.LanguageModel, + prompt: 'test', + onFinish: onFinishSpy, + }); + + expect(onFinishSpy).toHaveBeenCalledWith(finishEvent); + }).pipe(Effect.scoped, Effect.runPromise); + }); +}); + +describe('tool', () => { + test('wraps execute with Effect -> Promise bridging', () => + Effect.gen(function* () { + const myTool = yield* tool({ + description: 'test tool', + inputSchema: { type: 'object' } as unknown as Ai.FlexibleSchema<{ + city: string; + }>, + execute: (input) => Effect.succeed(`Weather in ${input.city}: sunny`), + }); + + expect(myTool.description).toBe('test tool'); + expect(myTool.execute).toBeDefined(); + + // biome-ignore lint/style/noNonNullAssertion: asserted above + const execute = myTool.execute!; + const result = yield* Effect.promise(() => + Promise.resolve( + execute( + { city: 'London' }, + { + toolCallId: 'test-id', + messages: [], + }, + ), + ), + ); + expect(result).toBe('Weather in London: sunny'); + }).pipe(Effect.scoped, Effect.runPromise)); + + test('passes through non-callback properties', () => + Effect.gen(function* () { + const myTool = yield* tool({ + description: 'my tool', + title: 'My Tool', + inputSchema: { type: 'object' } as unknown as Ai.FlexibleSchema<{ + x: number; + }>, + strict: true, + }); + + expect(myTool.description).toBe('my tool'); + expect(myTool.title).toBe('My Tool'); + expect(myTool.strict).toBe(true); + }).pipe(Effect.scoped, Effect.runPromise)); + + test('execute handler can access Effect services', () => { + class WeatherService extends Effect.Service()( + 'WeatherService', + { + succeed: { + getWeather: (city: string) => `${city}: rainy`, + }, + }, + ) {} + + return Effect.gen(function* () { + const myTool = yield* tool({ + description: 'weather', + inputSchema: { type: 'object' } as unknown as Ai.FlexibleSchema<{ + city: string; + }>, + execute: (input) => + Effect.gen(function* () { + const weather = yield* WeatherService; + return weather.getWeather(input.city); + }), + }); + + // biome-ignore lint/style/noNonNullAssertion: test assertion + const execute = myTool.execute!; + const result = yield* Effect.promise(() => + Promise.resolve( + execute( + { city: 'Paris' }, + { + toolCallId: 'test-id', + messages: [], + }, + ), + ), + ); + expect(result).toBe('Paris: rainy'); + }).pipe( + Effect.scoped, + Effect.provide(WeatherService.Default), + Effect.runPromise, + ); + }); +}); diff --git a/packages/effect/src/for/ai/index.ts b/packages/effect/src/for/ai/index.ts new file mode 100644 index 0000000..91cb99d --- /dev/null +++ b/packages/effect/src/for/ai/index.ts @@ -0,0 +1,200 @@ +import * as Ai from 'ai'; +import { Data, Effect, FiberSet, type Scope } from 'effect'; + +export { describe, effectSchema } from './schema'; + +export class AiError extends Data.TaggedError('ff-effect/AiError')<{ + message: string; + cause?: unknown; +}> {} + +// biome-ignore lint/suspicious/noExplicitAny: internal bridging helper, type safety enforced at public API boundary +function wrapCallback(runPromise: any, callback: any) { + if (callback == null) return undefined; + // biome-ignore lint/suspicious/noExplicitAny: internal bridging helper + return (...args: any[]) => runPromise(callback(...args)); +} + +type GenerateTextCallbackKeys = + | 'onStepFinish' + | 'onFinish' + | 'experimental_onStart' + | 'experimental_onStepStart' + | 'experimental_onToolCallStart' + | 'experimental_onToolCallFinish'; + +type StreamTextCallbackKeys = + | 'onChunk' + | 'onError' + | 'onFinish' + | 'onAbort' + | 'onStepFinish' + | 'experimental_onStart' + | 'experimental_onStepStart' + | 'experimental_onToolCallStart' + | 'experimental_onToolCallFinish'; + +type EffectifyCallbacks = Omit & { + [K in Keys & keyof T]?: NonNullable extends ( + ...args: infer A + ) => unknown + ? (...args: A) => Effect.Effect + : T[K]; +}; + +type GenerateTextOriginalParams = Parameters[0]; +type GenerateTextReturn = Awaited>; + +export function generateText( + params: EffectifyCallbacks< + GenerateTextOriginalParams, + GenerateTextCallbackKeys, + R + >, +): Effect.Effect { + return Effect.gen(function* () { + const runPromise = yield* FiberSet.makeRuntimePromise(); + + const originalParams = { + ...params, + onStepFinish: wrapCallback(runPromise, params.onStepFinish), + onFinish: wrapCallback(runPromise, params.onFinish), + experimental_onStart: wrapCallback( + runPromise, + params.experimental_onStart, + ), + experimental_onStepStart: wrapCallback( + runPromise, + params.experimental_onStepStart, + ), + experimental_onToolCallStart: wrapCallback( + runPromise, + params.experimental_onToolCallStart, + ), + experimental_onToolCallFinish: wrapCallback( + runPromise, + params.experimental_onToolCallFinish, + ), + } as GenerateTextOriginalParams; + + return yield* Effect.tryPromise({ + try: () => Ai.generateText(originalParams), + catch: (cause) => new AiError({ message: 'generateText failed', cause }), + }); + }).pipe(Effect.scoped); +} + +type StreamTextOriginalParams = Parameters[0]; +type StreamTextReturn = ReturnType; + +export function streamText( + params: EffectifyCallbacks< + StreamTextOriginalParams, + StreamTextCallbackKeys, + R + >, +): Effect.Effect { + return Effect.gen(function* () { + const runPromise = yield* FiberSet.makeRuntimePromise(); + + const originalParams = { + ...params, + onChunk: wrapCallback(runPromise, params.onChunk), + onError: wrapCallback(runPromise, params.onError), + onFinish: wrapCallback(runPromise, params.onFinish), + onAbort: wrapCallback(runPromise, params.onAbort), + onStepFinish: wrapCallback(runPromise, params.onStepFinish), + experimental_onStart: wrapCallback( + runPromise, + params.experimental_onStart, + ), + experimental_onStepStart: wrapCallback( + runPromise, + params.experimental_onStepStart, + ), + experimental_onToolCallStart: wrapCallback( + runPromise, + params.experimental_onToolCallStart, + ), + experimental_onToolCallFinish: wrapCallback( + runPromise, + params.experimental_onToolCallFinish, + ), + } as StreamTextOriginalParams; + + try { + return Ai.streamText(originalParams); + } catch (cause) { + return yield* Effect.fail( + new AiError({ message: 'streamText failed', cause }), + ); + } + }); +} + +type OriginalToolDef = Parameters< + typeof Ai.tool +>[0]; + +type ToolModelOutput = Awaited< + ReturnType['toModelOutput']>> +>; + +type EffectToolDef = Omit< + OriginalToolDef, + | 'execute' + | 'onInputStart' + | 'onInputDelta' + | 'onInputAvailable' + | 'toModelOutput' +> & { + execute?: ( + input: INPUT, + options: Ai.ToolExecutionOptions, + ) => Effect.Effect; + onInputStart?: ( + options: Ai.ToolExecutionOptions, + ) => Effect.Effect; + onInputDelta?: ( + options: { inputTextDelta: string } & Ai.ToolExecutionOptions, + ) => Effect.Effect; + onInputAvailable?: ( + options: { input: INPUT } & Ai.ToolExecutionOptions, + ) => Effect.Effect; + toModelOutput?: (options: { + toolCallId: string; + input: INPUT; + output: OUTPUT; + }) => Effect.Effect; +}; + +export function tool( + params: EffectToolDef, +): Effect.Effect, never, R | Scope.Scope> { + return Effect.gen(function* () { + const runPromise = yield* FiberSet.makeRuntimePromise(); + + const originalParams = { + ...params, + ...(params.execute && { + execute: (input: INPUT, options: Ai.ToolExecutionOptions) => + // biome-ignore lint/style/noNonNullAssertion: guarded by truthiness check + runPromise(params.execute!(input, options)), + }), + onInputStart: wrapCallback(runPromise, params.onInputStart), + onInputDelta: wrapCallback(runPromise, params.onInputDelta), + onInputAvailable: wrapCallback(runPromise, params.onInputAvailable), + ...(params.toModelOutput && { + toModelOutput: (options: { + toolCallId: string; + input: INPUT; + output: OUTPUT; + }) => + // biome-ignore lint/style/noNonNullAssertion: guarded by truthiness check + runPromise(params.toModelOutput!(options)), + }), + } as OriginalToolDef; + + return Ai.tool(originalParams); + }); +} diff --git a/packages/effect/src/for/ai/schema.ts b/packages/effect/src/for/ai/schema.ts new file mode 100644 index 0000000..36a15e9 --- /dev/null +++ b/packages/effect/src/for/ai/schema.ts @@ -0,0 +1,20 @@ +import { jsonSchema as aiJsonSchema } from 'ai'; +import { Either, JSONSchema, Schema } from 'effect'; + +export function effectSchema(schema: Schema.Schema) { + const decode = Schema.decodeUnknownEither(schema); + return aiJsonSchema(JSONSchema.make(schema), { + validate: (value) => { + const result = decode(value); + if (Either.isRight(result)) { + return { success: true as const, value: result.right }; + } + return { success: false as const, error: new Error(String(result.left)) }; + }, + }); +} + +export const describe = + (d: string) => + (self: Schema.Schema) => + self.annotations({ description: d }); diff --git a/packages/effect/tsup.config.ts b/packages/effect/tsup.config.ts index e552c20..8bc1f65 100644 --- a/packages/effect/tsup.config.ts +++ b/packages/effect/tsup.config.ts @@ -7,5 +7,5 @@ export default defineConfig({ splitting: false, sourcemap: true, clean: true, - external: ['effect', 'inngest', 'inngest/bun'], + external: ['effect', 'ai', 'inngest', 'inngest/bun'], });