Skip to content

Commit 3142377

Browse files
authored
feat(ai-bot): create event from url job (#2078)
1 parent 2c4c7f4 commit 3142377

File tree

6 files changed

+212
-9
lines changed

6 files changed

+212
-9
lines changed

packages/@liexp/backend/src/providers/ai/langchain.provider.ts

Lines changed: 96 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,19 @@
11
import { type Document as LangchainDocument } from "@langchain/core/documents";
2-
import { StringOutputParser } from "@langchain/core/output_parsers";
2+
import {
3+
JsonOutputParser,
4+
StringOutputParser,
5+
} from "@langchain/core/output_parsers";
36
import { PromptTemplate } from "@langchain/core/prompts";
47
import {
58
RunnablePassthrough,
69
RunnableSequence,
710
} from "@langchain/core/runnables";
811
import { ChatOpenAI, OpenAIEmbeddings } from "@langchain/openai";
912
import { GetLogger } from "@liexp/core/lib/logger/index.js";
13+
import {
14+
type CreateEventBody,
15+
type EventType,
16+
} from "@liexp/shared/lib/io/http/Events/index.js";
1017
import type * as Reader from "fp-ts/lib/Reader.js";
1118
import { loadSummarizationChain } from "langchain/chains";
1219
import { RecursiveCharacterTextSplitter } from "langchain/text_splitter";
@@ -41,7 +48,6 @@ Below you find the text you need to summarize.
4148

4249
export interface LangchainProvider {
4350
chat: ChatOpenAI;
44-
// embeddings: OpenAIEmbeddings;
4551
queryDocument: (
4652
url: LangchainDocument[],
4753
question: string,
@@ -51,6 +57,13 @@ export interface LangchainProvider {
5157
text: LangchainDocument[],
5258
options?: { model?: AvailableModels; prompt?: string; question?: string },
5359
) => Promise<string>;
60+
createEventFromDocuments: (
61+
content: LangchainDocument[],
62+
question: string,
63+
type: EventType,
64+
prompt: string,
65+
options?: { model?: AvailableModels },
66+
) => Promise<string>;
5467
}
5568

5669
export type AvailableModels =
@@ -111,6 +124,7 @@ export const GetLangchainProvider = (
111124
const embeddings = new OpenAIEmbeddings({
112125
model,
113126
apiKey: opts.apiKey,
127+
timeout: 60 * 30, // 30 minutes
114128
configuration: {
115129
baseURL: opts.baseURL,
116130
},
@@ -129,6 +143,7 @@ export const GetLangchainProvider = (
129143

130144
// Retrieve and generate using the relevant snippets of the blog.
131145
const retriever = vectorStore.asRetriever();
146+
132147
const prompt = PromptTemplate.fromTemplate(EMBEDDINGS_PROMPT);
133148

134149
const ragChain = RunnableSequence.from([
@@ -190,6 +205,85 @@ export const GetLangchainProvider = (
190205

191206
return output;
192207
},
208+
createEventFromDocuments: async (
209+
content,
210+
question,
211+
type,
212+
prompt,
213+
options,
214+
) => {
215+
const model =
216+
options?.model ?? opts.models?.embeddings ?? "text-embedding-ada-002";
217+
const chatModel = options?.model ?? opts.models?.chat ?? "gpt-4o";
218+
219+
const embeddings = new OpenAIEmbeddings({
220+
model,
221+
apiKey: opts.apiKey,
222+
configuration: {
223+
baseURL: opts.baseURL,
224+
},
225+
});
226+
227+
const chat = new ChatOpenAI({
228+
model: chatModel,
229+
temperature: 0,
230+
apiKey: opts.apiKey,
231+
timeout: 60 * 30,
232+
maxRetries: 3,
233+
configuration: {
234+
baseURL: opts.baseURL,
235+
},
236+
streaming: true,
237+
onFailedAttempt: (error) => {
238+
langchainLogger.error.log("Failed attempt", error);
239+
return error;
240+
},
241+
});
242+
243+
const textSplitter = new RecursiveCharacterTextSplitter({
244+
chunkSize: 2000,
245+
chunkOverlap: 100,
246+
});
247+
const splits = await textSplitter.splitDocuments(content);
248+
249+
const vectorStore = await MemoryVectorStore.fromDocuments(
250+
splits,
251+
embeddings,
252+
);
253+
254+
// Retrieve and generate using the relevant snippets of the blog.
255+
const retriever = vectorStore.asRetriever({ verbose: true });
256+
257+
const formatInstructions = `Respond with a valid JSON object, containing the fields: "title" and "date", considering the content describes an event of type "${type}".`;
258+
const promptTemplate = await PromptTemplate.fromTemplate(prompt).partial({
259+
format_instructions: formatInstructions,
260+
});
261+
262+
// Set up a parser + inject instructions into the prompt template.
263+
const parser = new JsonOutputParser<CreateEventBody>();
264+
265+
const ragChain = RunnableSequence.from([
266+
{
267+
context: retriever.pipe(formatDocumentsAsString),
268+
question: new RunnablePassthrough(),
269+
},
270+
promptTemplate.pipe(chat).pipe(parser),
271+
]);
272+
273+
const stream = await ragChain.stream({
274+
question,
275+
});
276+
277+
let output: any;
278+
for await (const chunk of stream) {
279+
langchainLogger.debug.log("chunk", chunk);
280+
output.push(chunk);
281+
}
282+
283+
langchainLogger.info.log("output", output);
284+
285+
return JSON.stringify(output);
286+
},
193287
};
194288
};
195289

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import { createEventFromText } from "@liexp/backend/lib/flows/ai/createEventFromText.flow.js";
2+
import { LoggerService } from "@liexp/backend/lib/services/logger/logger.service.js";
3+
import { fp, pipe } from "@liexp/core/lib/fp/index.js";
4+
import { type CreateEventFromTextQueueData } from "@liexp/shared/lib/io/http/Queue/CreateEventFromTextQueueData.js";
5+
import { toAIBotError } from "../../common/error/index.js";
6+
import { type ClientContext } from "../../context.js";
7+
import { loadDocs } from "./common/loadDocs.flow.js";
8+
import { getPromptFromResource } from "./prompts.js";
9+
import { type JobProcessRTE } from "#services/job-processor/job-processor.service.js";
10+
11+
export const createEventFromTextFlow: JobProcessRTE<
12+
CreateEventFromTextQueueData
13+
> = (job) => {
14+
return pipe(
15+
fp.RTE.Do,
16+
fp.RTE.bind("docs", () => loadDocs(job)),
17+
fp.RTE.bind(
18+
"jsonSchema",
19+
() => (ctx) =>
20+
pipe(
21+
ctx.endpointsRESTClient.Endpoints.Event.getList({
22+
filter: { eventType: [job.data.type] },
23+
sort: { field: "updatedAt", order: "DESC" },
24+
}),
25+
fp.TE.map((events) => events.data[0]),
26+
fp.TE.mapLeft(toAIBotError),
27+
),
28+
),
29+
fp.RTE.chain(({ docs, jsonSchema }) =>
30+
createEventFromText<ClientContext>(
31+
docs,
32+
job.data.text,
33+
job.data.type,
34+
job.prompt ?? getPromptFromResource(job.resource, job.type),
35+
JSON.stringify(jsonSchema),
36+
),
37+
),
38+
fp.RTE.map((event) => JSON.stringify(event)),
39+
LoggerService.RTE.debug("`createEventFromTextFlow` result: %O"),
40+
);
41+
};
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import { type AvailableModels } from "@liexp/backend/lib/providers/ai/langchain.provider.js";
2+
import { LoggerService } from "@liexp/backend/lib/services/logger/logger.service.js";
3+
import { fp, pipe } from "@liexp/core/lib/fp/index.js";
4+
import { type CreateEventFromURLQueueData } from "@liexp/shared/lib/io/http/Queue/CreateEventFromURLQueue.js";
5+
import { toAIBotError } from "../../common/error/index.js";
6+
import { loadDocs } from "./common/loadDocs.flow.js";
7+
import { getPromptFromResource } from "./prompts.js";
8+
import { type JobProcessRTE } from "#services/job-processor/job-processor.service.js";
9+
10+
const defaultQuestion = "";
11+
12+
export const createEventFromURLFlow: JobProcessRTE<
13+
CreateEventFromURLQueueData
14+
> = (job) => (ctx) => {
15+
return pipe(
16+
loadDocs(job)(ctx),
17+
fp.TE.chain((docs) =>
18+
fp.TE.tryCatch(() => {
19+
return ctx.langchain.createEventFromDocuments(
20+
docs,
21+
job.question ?? defaultQuestion,
22+
job.data.type,
23+
job.prompt ?? getPromptFromResource(job.resource, job.type),
24+
{
25+
model: ctx.config.config.localAi.models
26+
?.embeddings as AvailableModels,
27+
},
28+
);
29+
}, toAIBotError),
30+
),
31+
LoggerService.TE.debug(ctx, "`createEventFlow` result: %O"),
32+
);
33+
};
Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,18 @@
1-
import { fp } from "@liexp/core/lib/fp/index.js";
21
import { OpenAICreateEventFromTextType } from "@liexp/shared/lib/io/http/Queue/CreateEventFromTextQueueData.js";
32
import { OpenAICreateEventFromURLType } from "@liexp/shared/lib/io/http/Queue/CreateEventFromURLQueue.js";
43
import {
54
OpenAIEmbeddingQueueType,
65
OpenAISummarizeQueueType,
76
} from "@liexp/shared/lib/io/http/Queue/index.js";
8-
import { type Queue } from "@liexp/shared/lib/io/http/index.js";
9-
import { type ClientContextRTE } from "../../types.js";
7+
import { createEventFromTextFlow } from "./createEventFromText.flow.js";
8+
import { createEventFromURLFlow } from "./createEventFromURL.flow.js";
109
import { embedAndQuestionFlow } from "./embedAndQuestion.js";
1110
import { summarizeTextFlow } from "./summarizeTexFlow.js";
1211
import { GetJobProcessor } from "#services/job-processor/job-processor.service.js";
1312

1413
export const JobProcessor = GetJobProcessor({
1514
[OpenAISummarizeQueueType.value]: summarizeTextFlow,
1615
[OpenAIEmbeddingQueueType.value]: embedAndQuestionFlow,
17-
[OpenAICreateEventFromURLType.value]: (
18-
job: Queue.Queue,
19-
): ClientContextRTE<string> => fp.RTE.right(""),
20-
[OpenAICreateEventFromTextType.value]: (job: Queue.Queue) => fp.RTE.right(""),
16+
[OpenAICreateEventFromURLType.value]: createEventFromURLFlow,
17+
[OpenAICreateEventFromTextType.value]: createEventFromTextFlow,
2118
});

services/api/test/vitest.config.e2e.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ const config = extendBaseConfig(import.meta.url, (toAlias) => ({
1010
globalSetup: [toAlias(`globalSetup.ts`)],
1111
exclude: ["**/build", "**/src/migrations", "**/src/scripts"],
1212
pool: "forks",
13+
bail: 1,
1314
poolOptions: {
1415
forks: {
1516
singleFork: process.env.CI === "true" ? true : false,

services/worker/src/jobs/processOpenAIJobsDone.job.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ import {
1313
} from "@liexp/backend/lib/services/entity-repository.service.js";
1414
import { LoggerService } from "@liexp/backend/lib/services/logger/logger.service.js";
1515
import { fp, pipe } from "@liexp/core/lib/fp/index.js";
16+
import { DecodeError } from "@liexp/shared/lib/io/http/Error/DecodeError.js";
17+
import { Event } from "@liexp/shared/lib/io/http/Events/index.js";
18+
import { OpenAICreateEventFromTextType } from "@liexp/shared/lib/io/http/Queue/CreateEventFromTextQueueData.js";
1619
import { type Queue } from "@liexp/shared/lib/io/http/index.js";
1720
import { Equal, type FindOptionsWhere } from "typeorm";
1821
import { type RTE } from "../types.js";
@@ -60,6 +63,33 @@ const processDoneJobBlockNoteResult =
6063
);
6164
};
6265

66+
const processDoneJobEventResult =
67+
(dbService: EntityRepository<EventV2Entity>) =>
68+
(job: Queue.Queue): RTE<Queue.Queue> => {
69+
return pipe(
70+
fp.RTE.Do,
71+
fp.RTE.bind("event", () => {
72+
return pipe(
73+
fp.RTE.of(job.result),
74+
fp.RTE.chainEitherK(Event.decode),
75+
fp.RTE.mapLeft((errs) => DecodeError.of("Event", errs)),
76+
);
77+
}),
78+
fp.RTE.chain(({ event }) =>
79+
dbService.save([
80+
{
81+
...event,
82+
links: event.links.map((l) => ({ id: l })),
83+
media: event.media.map((m) => ({ id: m })),
84+
keywords: event.keywords.map((k) => ({ id: k })),
85+
socialPosts: event.socialPosts?.map((s) => ({ id: s })),
86+
},
87+
]),
88+
),
89+
fp.RTE.map(() => job),
90+
);
91+
};
92+
6393
export const processDoneJob = (job: Queue.Queue): RTE<Queue.Queue> => {
6494
return pipe(
6595
fp.RTE.right(job),
@@ -105,6 +135,13 @@ export const processDoneJob = (job: Queue.Queue): RTE<Queue.Queue> => {
105135
}
106136

107137
if (job.resource === "events") {
138+
if (OpenAICreateEventFromTextType.is(job.type)) {
139+
return pipe(
140+
processDoneJobEventResult(EventRepository)(job),
141+
fp.RTE.map(() => job),
142+
);
143+
}
144+
108145
return pipe(
109146
processDoneJobBlockNoteResult(EventRepository)(job),
110147
fp.RTE.map(() => job),

0 commit comments

Comments
 (0)