From 882c70c2ef8259276c654d4c6c8ba73f1fd6a907 Mon Sep 17 00:00:00 2001 From: twosom Date: Fri, 21 Nov 2025 12:24:06 +0900 Subject: [PATCH 01/13] feat(api): add VectorStore annotation for agent plan resource management --- .../agents/api/annotation/VectorStore.java | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 api/src/main/java/org/apache/flink/agents/api/annotation/VectorStore.java diff --git a/api/src/main/java/org/apache/flink/agents/api/annotation/VectorStore.java b/api/src/main/java/org/apache/flink/agents/api/annotation/VectorStore.java new file mode 100644 index 00000000..1a807e46 --- /dev/null +++ b/api/src/main/java/org/apache/flink/agents/api/annotation/VectorStore.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.api.annotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Annotation to mark a field or method as a vector store resource that should be managed by the agent plan. + * + *

Fields annotated with @VectorStore will be scanned during agent plan creation and corresponding + * resource providers will be created to manage the vector store instances. Methods annotated with @VectorStore + * must be static and will be wrapped as vector store resources. + */ +@Target({ElementType.FIELD, ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +public @interface VectorStore { +} From 00e9009006a10ec1f47c0c170373cc05710bbcc4 Mon Sep 17 00:00:00 2001 From: twosom Date: Fri, 21 Nov 2025 12:44:30 +0900 Subject: [PATCH 02/13] chore : spotless --- .../flink/agents/api/annotation/VectorStore.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/api/src/main/java/org/apache/flink/agents/api/annotation/VectorStore.java b/api/src/main/java/org/apache/flink/agents/api/annotation/VectorStore.java index 1a807e46..cc7b0928 100644 --- a/api/src/main/java/org/apache/flink/agents/api/annotation/VectorStore.java +++ b/api/src/main/java/org/apache/flink/agents/api/annotation/VectorStore.java @@ -24,13 +24,13 @@ import java.lang.annotation.Target; /** - * Annotation to mark a field or method as a vector store resource that should be managed by the agent plan. + * Annotation to mark a field or method as a vector store resource that should be managed by the + * agent plan. * - *

Fields annotated with @VectorStore will be scanned during agent plan creation and corresponding - * resource providers will be created to manage the vector store instances. Methods annotated with @VectorStore - * must be static and will be wrapped as vector store resources. + *

Fields annotated with @VectorStore will be scanned during agent plan creation and + * corresponding resource providers will be created to manage the vector store instances. Methods + * annotated with @VectorStore must be static and will be wrapped as vector store resources. */ @Target({ElementType.FIELD, ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) -public @interface VectorStore { -} +public @interface VectorStore {} From 2b7bc58da2dddfae9d89cdaea5e7eb12265075a6 Mon Sep 17 00:00:00 2001 From: twosom Date: Fri, 21 Nov 2025 12:48:32 +0900 Subject: [PATCH 03/13] feat(api): add context retrieval request and response events --- .../event/ContextRetrievalRequestEvent.java | 69 +++++++++++++++++++ .../event/ContextRetrievalResponseEvent.java | 69 +++++++++++++++++++ 2 files changed, 138 insertions(+) create mode 100644 api/src/main/java/org/apache/flink/agents/api/event/ContextRetrievalRequestEvent.java create mode 100644 api/src/main/java/org/apache/flink/agents/api/event/ContextRetrievalResponseEvent.java diff --git a/api/src/main/java/org/apache/flink/agents/api/event/ContextRetrievalRequestEvent.java b/api/src/main/java/org/apache/flink/agents/api/event/ContextRetrievalRequestEvent.java new file mode 100644 index 00000000..8f9ea9ca --- /dev/null +++ b/api/src/main/java/org/apache/flink/agents/api/event/ContextRetrievalRequestEvent.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.api.event; + +import org.apache.flink.agents.api.Event; + +/** Event representing a request for context retrieval. */ +public class ContextRetrievalRequestEvent extends Event { + + private static final int DEFAULT_MAX_RESULTS = 3; + + private final String query; + private final String vectorStore; + private final int maxResults; + + public ContextRetrievalRequestEvent(String query, String vectorStore) { + this.query = query; + this.vectorStore = vectorStore; + this.maxResults = DEFAULT_MAX_RESULTS; + } + + public ContextRetrievalRequestEvent(String query, String vectorStore, int maxResults) { + this.query = query; + this.vectorStore = vectorStore; + this.maxResults = maxResults; + } + + public String getQuery() { + return query; + } + + public String getVectorStore() { + return vectorStore; + } + + public int getMaxResults() { + return maxResults; + } + + @Override + public String toString() { + return "ContextRetrievalRequestEvent{" + + "query='" + + query + + '\'' + + ", vectorStore='" + + vectorStore + + '\'' + + ", maxResults=" + + maxResults + + '}'; + } +} diff --git a/api/src/main/java/org/apache/flink/agents/api/event/ContextRetrievalResponseEvent.java b/api/src/main/java/org/apache/flink/agents/api/event/ContextRetrievalResponseEvent.java new file mode 100644 index 00000000..195c12e8 --- /dev/null +++ b/api/src/main/java/org/apache/flink/agents/api/event/ContextRetrievalResponseEvent.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.api.event; + +import org.apache.flink.agents.api.Event; +import org.apache.flink.agents.api.vectorstores.Document; + +import java.util.List; +import java.util.UUID; + +/** + * Event representing retrieved context results. + * + * @param + */ +public class ContextRetrievalResponseEvent extends Event { + + private final UUID requestId; + private final String query; + private final List> documents; + + public ContextRetrievalResponseEvent( + UUID requestId, String query, List> documents) { + this.requestId = requestId; + this.query = query; + this.documents = documents; + } + + public UUID getRequestId() { + return requestId; + } + + public String getQuery() { + return query; + } + + public List> getDocuments() { + return documents; + } + + @Override + public String toString() { + return "ContextRetrievalResponseEvent{" + + "requestId=" + + requestId + + ", query='" + + query + + '\'' + + ", documents=" + + documents + + '}'; + } +} From 8e128bad8f19a378c9af481456567870ecd78d50 Mon Sep 17 00:00:00 2001 From: twosom Date: Fri, 21 Nov 2025 12:57:43 +0900 Subject: [PATCH 04/13] feat(api): add vector store API with query support and document handling --- .../api/vectorstores/BaseVectorStore.java | 96 +++++++++++++++++++ .../agents/api/vectorstores/Document.java | 60 ++++++++++++ .../api/vectorstores/VectorStoreQuery.java | 93 ++++++++++++++++++ .../vectorstores/VectorStoreQueryMode.java | 37 +++++++ .../vectorstores/VectorStoreQueryResult.java | 48 ++++++++++ 5 files changed, 334 insertions(+) create mode 100644 api/src/main/java/org/apache/flink/agents/api/vectorstores/BaseVectorStore.java create mode 100644 api/src/main/java/org/apache/flink/agents/api/vectorstores/Document.java create mode 100644 api/src/main/java/org/apache/flink/agents/api/vectorstores/VectorStoreQuery.java create mode 100644 api/src/main/java/org/apache/flink/agents/api/vectorstores/VectorStoreQueryMode.java create mode 100644 api/src/main/java/org/apache/flink/agents/api/vectorstores/VectorStoreQueryResult.java diff --git a/api/src/main/java/org/apache/flink/agents/api/vectorstores/BaseVectorStore.java b/api/src/main/java/org/apache/flink/agents/api/vectorstores/BaseVectorStore.java new file mode 100644 index 00000000..3d65e866 --- /dev/null +++ b/api/src/main/java/org/apache/flink/agents/api/vectorstores/BaseVectorStore.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.api.vectorstores; + +import org.apache.flink.agents.api.embedding.model.BaseEmbeddingModelConnection; +import org.apache.flink.agents.api.resource.Resource; +import org.apache.flink.agents.api.resource.ResourceDescriptor; +import org.apache.flink.agents.api.resource.ResourceType; + +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; + +/** + * Base abstract class for vector store. Provides vector store functionality that integrates + * embedding models for text-based semantic search. Handles both connection management and embedding + * generation internally. + * + * @param The type of content stored in the vector store documents + */ +public abstract class BaseVectorStore extends Resource { + + /** Name of the embedding model resource to use. */ + protected final String embeddingModel; + + public BaseVectorStore( + ResourceDescriptor descriptor, BiFunction getResource) { + super(descriptor, getResource); + this.embeddingModel = descriptor.getArgument("embedding_model"); + } + + @Override + public ResourceType getResourceType() { + return ResourceType.VECTOR_STORE; + } + + /** + * Returns vector store setup settings passed to connection. These parameters are merged with + * query-specific parameters when performing vector search operations. + * + * @return A map containing the store configuration parameters + */ + public abstract Map getStoreKwargs(); + + /** + * Performs vector search using structured query object. Converts text query to embeddings and + * returns structured query result. + * + * @param query VectorStoreQuery object containing query parameters + * @return VectorStoreQueryResult containing the retrieved documents + */ + public VectorStoreQueryResult query(VectorStoreQuery query) { + final BaseEmbeddingModelConnection embeddingModel = + (BaseEmbeddingModelConnection) + this.getResource.apply(this.embeddingModel, ResourceType.EMBEDDING_MODEL); + + // TODO + // for now, we don't need to use additional parameters. + final float[] queryEmbedding = embeddingModel.embed(query.getQueryText(), Map.of()); + + final Map storeKwargs = this.getStoreKwargs(); + storeKwargs.putAll(query.getExtraArgs()); + + final List> documents = + this.queryEmbedding(queryEmbedding, query.getLimit(), storeKwargs); + + return new VectorStoreQueryResult<>(documents); + } + + /** + * Performs vector search using a pre-computed embedding. + * + * @param embedding The embedding vector to search with + * @param limit Maximum number of results to return + * @param args Additional arguments for the vector search + * @return List of documents matching the query embedding + */ + public abstract List> queryEmbedding( + float[] embedding, int limit, Map args); +} diff --git a/api/src/main/java/org/apache/flink/agents/api/vectorstores/Document.java b/api/src/main/java/org/apache/flink/agents/api/vectorstores/Document.java new file mode 100644 index 00000000..887651ee --- /dev/null +++ b/api/src/main/java/org/apache/flink/agents/api/vectorstores/Document.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.api.vectorstores; + +import java.util.Map; + +/** + * A document retrieved from vector store search. + * + *

Represents a single piece of content with associated metadata. This class is generic to + * support different content types while maintaining consistent metadata and identification + * structure. + * + * @param the type of the document content + */ +public class Document { + + /** Unique identifier of the document (if available). */ + private final String id; + + /** The actual content of the document. */ + private final ContentT content; + + /** Document metadata such as source, author, timestamp, etc. */ + private final Map metadata; + + public Document(ContentT content, Map metadata, String id) { + this.content = content; + this.metadata = metadata; + this.id = id; + } + + public ContentT getContent() { + return content; + } + + public Map getMetadata() { + return metadata; + } + + public String getId() { + return id; + } +} diff --git a/api/src/main/java/org/apache/flink/agents/api/vectorstores/VectorStoreQuery.java b/api/src/main/java/org/apache/flink/agents/api/vectorstores/VectorStoreQuery.java new file mode 100644 index 00000000..af31dc6f --- /dev/null +++ b/api/src/main/java/org/apache/flink/agents/api/vectorstores/VectorStoreQuery.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.api.vectorstores; + +import java.util.HashMap; +import java.util.Map; + +/** + * Parameters for querying a {@link BaseVectorStore}. + * + *

A query consists of a textual prompt that will be embedded (for semantic search) or used as a + * keyword string, an optional {@code limit} for the number of results, a {@link + * VectorStoreQueryMode}, and an optional map of store-specific arguments. + */ +public class VectorStoreQuery { + + /** How the query should be executed (semantic, keyword, or hybrid). */ + private final VectorStoreQueryMode mode; + /** The user-provided query text. */ + private final String queryText; + /** Maximum number of documents to return. */ + private final Integer limit; + /** Additional store-specific parameters. */ + private final Map extraArgs; + + /** + * Creates a semantic-search query with default mode {@link VectorStoreQueryMode#SEMANTIC}. + * + * @param queryText the text to embed and search for + * @param limit maximum number of results to return + */ + public VectorStoreQuery(String queryText, Integer limit) { + this.mode = VectorStoreQueryMode.SEMANTIC; + this.queryText = queryText; + this.limit = limit; + this.extraArgs = new HashMap<>(); + } + + /** + * Creates a query with explicit mode and extra arguments. + * + * @param mode the query mode + * @param queryText the text to search for + * @param limit maximum number of results to return + * @param extraArgs store-specific additional parameters + */ + public VectorStoreQuery( + VectorStoreQueryMode mode, + String queryText, + Integer limit, + Map extraArgs) { + this.mode = mode; + this.queryText = queryText; + this.limit = limit; + this.extraArgs = extraArgs; + } + + /** Returns the query mode. */ + public VectorStoreQueryMode getMode() { + return mode; + } + + /** Returns the query text. */ + public String getQueryText() { + return queryText; + } + + /** Returns the requested result limit. */ + public Integer getLimit() { + return limit; + } + + /** Returns extra store-specific arguments. */ + public Map getExtraArgs() { + return extraArgs; + } +} diff --git a/api/src/main/java/org/apache/flink/agents/api/vectorstores/VectorStoreQueryMode.java b/api/src/main/java/org/apache/flink/agents/api/vectorstores/VectorStoreQueryMode.java new file mode 100644 index 00000000..301ca759 --- /dev/null +++ b/api/src/main/java/org/apache/flink/agents/api/vectorstores/VectorStoreQueryMode.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.api.vectorstores; + +/** + * Query execution modes for vector stores. + * + *

+ */ +public enum VectorStoreQueryMode { + /** Semantic similarity search using embeddings. */ + SEMANTIC, + /** Keyword/lexical search (store dependent). */ + KEYWORD, + /** Hybrid search combining semantic and keyword results. */ + HYBRID; +} diff --git a/api/src/main/java/org/apache/flink/agents/api/vectorstores/VectorStoreQueryResult.java b/api/src/main/java/org/apache/flink/agents/api/vectorstores/VectorStoreQueryResult.java new file mode 100644 index 00000000..bb10e63d --- /dev/null +++ b/api/src/main/java/org/apache/flink/agents/api/vectorstores/VectorStoreQueryResult.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.api.vectorstores; + +import java.util.List; + +/** + * Result of a vector store query. + * + *

Wraps the list of {@link Document} instances returned by a {@link BaseVectorStore} query in a + * simple value object. + * + * @param the type of the document content in the result + */ +public class VectorStoreQueryResult { + /** The documents matched by the query, ordered by relevance. */ + private final List> documents; + + /** + * Creates a new query result. + * + * @param documents the matched documents + */ + public VectorStoreQueryResult(List> documents) { + this.documents = documents; + } + + /** Returns the matched documents. */ + public List> getDocuments() { + return documents; + } +} From 1f4a6041e5d7b552226901f346f267d948a3e315 Mon Sep 17 00:00:00 2001 From: twosom Date: Fri, 21 Nov 2025 13:11:56 +0900 Subject: [PATCH 05/13] feat: add context retrieval action with vector store support --- .../apache/flink/agents/plan/AgentPlan.java | 11 ++- .../plan/actions/ContextRetrievalAction.java | 75 +++++++++++++++++++ 2 files changed, 80 insertions(+), 6 deletions(-) create mode 100644 plan/src/main/java/org/apache/flink/agents/plan/actions/ContextRetrievalAction.java diff --git a/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java b/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java index 7cb81386..cb942246 100644 --- a/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java +++ b/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java @@ -20,12 +20,7 @@ import org.apache.flink.agents.api.Agent; import org.apache.flink.agents.api.Event; -import org.apache.flink.agents.api.annotation.ChatModelConnection; -import org.apache.flink.agents.api.annotation.ChatModelSetup; -import org.apache.flink.agents.api.annotation.EmbeddingModelConnection; -import org.apache.flink.agents.api.annotation.EmbeddingModelSetup; -import org.apache.flink.agents.api.annotation.Prompt; -import org.apache.flink.agents.api.annotation.Tool; +import org.apache.flink.agents.api.annotation.*; import org.apache.flink.agents.api.resource.Resource; import org.apache.flink.agents.api.resource.ResourceDescriptor; import org.apache.flink.agents.api.resource.ResourceType; @@ -33,6 +28,7 @@ import org.apache.flink.agents.api.tools.ToolMetadata; import org.apache.flink.agents.plan.actions.Action; import org.apache.flink.agents.plan.actions.ChatModelAction; +import org.apache.flink.agents.plan.actions.ContextRetrievalAction; import org.apache.flink.agents.plan.actions.ToolCallAction; import org.apache.flink.agents.plan.resourceprovider.JavaResourceProvider; import org.apache.flink.agents.plan.resourceprovider.JavaSerializableResourceProvider; @@ -254,6 +250,7 @@ private void extractActionsFromAgent(Agent agent) throws Exception { // Add built-in actions addBuiltAction(ChatModelAction.getChatModelAction()); addBuiltAction(ToolCallAction.getToolCallAction()); + addBuiltAction(ContextRetrievalAction.getContextRetrievalAction()); // Scan the agent class for methods annotated with @Action Class agentClass = agent.getClass(); @@ -378,6 +375,8 @@ private void extractResourceProvidersFromAgent(Agent agent) throws Exception { extractResource(ResourceType.EMBEDDING_MODEL, method); } else if (method.isAnnotationPresent(EmbeddingModelConnection.class)) { extractResource(ResourceType.EMBEDDING_MODEL_CONNECTION, method); + } else if (method.isAnnotationPresent(VectorStore.class)) { + extractResource(ResourceType.VECTOR_STORE, method); } } diff --git a/plan/src/main/java/org/apache/flink/agents/plan/actions/ContextRetrievalAction.java b/plan/src/main/java/org/apache/flink/agents/plan/actions/ContextRetrievalAction.java new file mode 100644 index 00000000..b9544fde --- /dev/null +++ b/plan/src/main/java/org/apache/flink/agents/plan/actions/ContextRetrievalAction.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.plan.actions; + +import org.apache.flink.agents.api.Event; +import org.apache.flink.agents.api.context.RunnerContext; +import org.apache.flink.agents.api.event.ContextRetrievalRequestEvent; +import org.apache.flink.agents.api.event.ContextRetrievalResponseEvent; +import org.apache.flink.agents.api.resource.ResourceType; +import org.apache.flink.agents.api.vectorstores.BaseVectorStore; +import org.apache.flink.agents.api.vectorstores.VectorStoreQuery; +import org.apache.flink.agents.api.vectorstores.VectorStoreQueryResult; +import org.apache.flink.agents.plan.JavaFunction; + +import java.util.List; + +/** Built-in action for processing context retrieval requests. */ +public class ContextRetrievalAction { + + public static Action getContextRetrievalAction() throws Exception { + return new Action( + "context_retrieval_action", + new JavaFunction( + ContextRetrievalAction.class, + "processContextRetrievalRequest", + new Class[] {Event.class, RunnerContext.class}), + List.of( + ContextRetrievalRequestEvent.class.getName(), + ContextRetrievalResponseEvent.class.getName())); + } + + @SuppressWarnings("unchecked") + public static void processContextRetrievalRequest(Event event, RunnerContext ctx) + throws Exception { + if (event instanceof ContextRetrievalRequestEvent) { + final ContextRetrievalRequestEvent contextRetrievalRequestEvent = + (ContextRetrievalRequestEvent) event; + + final BaseVectorStore vectorStore = + (BaseVectorStore) + ctx.getResource( + contextRetrievalRequestEvent.getVectorStore(), + ResourceType.VECTOR_STORE); + + final VectorStoreQuery vectorStoreQuery = + new VectorStoreQuery( + contextRetrievalRequestEvent.getQuery(), + ((ContextRetrievalRequestEvent) event).getMaxResults()); + + final VectorStoreQueryResult result = vectorStore.query(vectorStoreQuery); + + ctx.sendEvent( + new ContextRetrievalResponseEvent<>( + contextRetrievalRequestEvent.getId(), + contextRetrievalRequestEvent.getQuery(), + result.getDocuments())); + } + } +} From 02e34b451a943450fb166f6ca632ac33af21b20e Mon Sep 17 00:00:00 2001 From: twosom Date: Tue, 25 Nov 2025 22:40:01 +0900 Subject: [PATCH 06/13] test: update AgentPlanTest --- .../java/org/apache/flink/agents/plan/AgentPlanTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java index e61dbb91..45a30e21 100644 --- a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java +++ b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java @@ -137,7 +137,7 @@ public void testConstructorWithAgent() throws Exception { AgentPlan agentPlan = new AgentPlan(agent); // Verify that actions were collected correctly - assertThat(agentPlan.getActions().size()).isEqualTo(4); + assertThat(agentPlan.getActions().size()).isEqualTo(5); assertThat(agentPlan.getActions()).containsKey("handleInputEvent"); assertThat(agentPlan.getActions()).containsKey("handleMultipleEvents"); @@ -164,7 +164,7 @@ public void testConstructorWithAgent() throws Exception { assertThat(multiAction.getExec()).isInstanceOf(JavaFunction.class); // Verify actionsByEvent mapping - assertThat(agentPlan.getActionsByEvent().size()).isEqualTo(6); + assertThat(agentPlan.getActionsByEvent().size()).isEqualTo(8); // Check InputEvent mapping List inputEventActions = @@ -199,8 +199,8 @@ public void testConstructorWithAgentNoActions() throws Exception { AgentPlan agentPlan = new AgentPlan(emptyAgent); // Verify that no actions were collected - assertThat(agentPlan.getActions().size()).isEqualTo(2); - assertThat(agentPlan.getActionsByEvent().size()).isEqualTo(3); + assertThat(agentPlan.getActions().size()).isEqualTo(3); + assertThat(agentPlan.getActionsByEvent().size()).isEqualTo(5); } @Test From 9b9bcc0d1a2435e85b76d92ac8872fbcf2ed66ef Mon Sep 17 00:00:00 2001 From: twosom Date: Tue, 25 Nov 2025 22:53:34 +0900 Subject: [PATCH 07/13] feat(vector-stores): add Elasticsearch vector store integration --- integrations/pom.xml | 2 + .../vector-stores/elasticsearch/pom.xml | 52 +++ .../ElasticsearchVectorStore.java | 309 ++++++++++++++++++ integrations/vector-stores/pom.xml | 37 +++ 4 files changed, 400 insertions(+) create mode 100644 integrations/vector-stores/elasticsearch/pom.xml create mode 100644 integrations/vector-stores/elasticsearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/elasticsearch/ElasticsearchVectorStore.java create mode 100644 integrations/vector-stores/pom.xml diff --git a/integrations/pom.xml b/integrations/pom.xml index a145fb7e..88ea4cfb 100644 --- a/integrations/pom.xml +++ b/integrations/pom.xml @@ -32,11 +32,13 @@ under the License. 1.1.5 + 8.19.0 chat-models embedding-models + vector-stores \ No newline at end of file diff --git a/integrations/vector-stores/elasticsearch/pom.xml b/integrations/vector-stores/elasticsearch/pom.xml new file mode 100644 index 00000000..2f745fee --- /dev/null +++ b/integrations/vector-stores/elasticsearch/pom.xml @@ -0,0 +1,52 @@ + + + + 4.0.0 + + + org.apache.flink + flink-agents-integrations-vector-stores + 0.2-SNAPSHOT + ../pom.xml + + + flink-agents-integrations-vector-stores-elasticsearch + Flink Agents : Integrations: Vector Stores: Elasticsearch + jar + + + + org.apache.flink + flink-agents-api + ${project.version} + + + org.apache.flink + flink-agents-plan + ${project.version} + + + co.elastic.clients + elasticsearch-java + ${elasticsearch.version} + + + + \ No newline at end of file diff --git a/integrations/vector-stores/elasticsearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/elasticsearch/ElasticsearchVectorStore.java b/integrations/vector-stores/elasticsearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/elasticsearch/ElasticsearchVectorStore.java new file mode 100644 index 00000000..43435abb --- /dev/null +++ b/integrations/vector-stores/elasticsearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/elasticsearch/ElasticsearchVectorStore.java @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.integrations.vectorstores.elasticsearch; + +import co.elastic.clients.elasticsearch.ElasticsearchClient; +import co.elastic.clients.elasticsearch.core.SearchRequest; +import co.elastic.clients.elasticsearch.core.SearchResponse; +import co.elastic.clients.elasticsearch.core.search.Hit; +import co.elastic.clients.json.jackson.JacksonJsonpMapper; +import co.elastic.clients.transport.ElasticsearchTransport; +import co.elastic.clients.transport.rest_client.RestClientTransport; +import org.apache.flink.agents.api.resource.Resource; +import org.apache.flink.agents.api.resource.ResourceDescriptor; +import org.apache.flink.agents.api.resource.ResourceType; +import org.apache.flink.agents.api.vectorstores.BaseVectorStore; +import org.apache.flink.agents.api.vectorstores.Document; +import org.apache.http.Header; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.message.BasicHeader; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; + +import java.io.IOException; +import java.io.StringReader; +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.function.BiFunction; + +/** + * Elasticsearch-backed implementation of a vector store. + * + *

This implementation executes approximate nearest neighbor (ANN) KNN queries against an + * Elasticsearch index that contains a dense vector field. It integrates with an embedding model + * (configured via the {@code embedding_model} resource argument inherited from {@link + * BaseVectorStore}) to convert query text into embeddings and then performs vector search using + * Elasticsearch's KNN capabilities. + * + *

Configuration is provided through {@link + * org.apache.flink.agents.api.resource.ResourceDescriptor} arguments. The most relevant ones are: + * + *

+ * + *

Example usage (aligned with ElasticsearchRagExample): + * + *

{@code
+ * ResourceDescriptor desc = ResourceDescriptor.Builder
+ *     .newBuilder(ElasticsearchVectorStore.class.getName())
+ *     .addInitialArgument("embedding_model", "textEmbedder") // name of embedding resource
+ *     .addInitialArgument("index", "my_documents")
+ *     .addInitialArgument("vector_field", "content_vector")
+ *     .addInitialArgument("dims", 768)
+ *     .addInitialArgument("host", "http://localhost:9200")
+ *     // Optional auth (API key or basic):
+ *     // .addInitialArgument("api_key_base64", "")
+ *     // .addInitialArgument("username", "elastic")
+ *     // .addInitialArgument("password", "secret")
+ *     .build();
+ * }
+ */ +public class ElasticsearchVectorStore extends BaseVectorStore> { + + /** Default vector dimensionality used when {@code dims} is not provided. */ + public static final int DEFAULT_DIMENSION = 768; + + /** Low-level Elasticsearch client used to execute search requests. */ + private final ElasticsearchClient client; + + /** Target index name. */ + private final String index; + /** Name of the dense vector field on which KNN queries are executed. */ + private final String vectorField; + /** Vector dimensionality of the {@link #vectorField}. */ + private final int dims; + /** Default value for KNN result size (can be overridden per query). */ + private final Integer k; + /** Default number of ANN candidates for KNN (can be overridden per query). */ + private final Integer numCandidates; + /** Optional default filter query in Elasticsearch JSON DSL (can be overridden per query). */ + private final String filterQuery; + + /** + * Creates a new {@code ElasticsearchVectorStore} from the provided descriptor and resource + * resolver. + * + *

The constructor reads connection, authentication, and query defaults from the descriptor + * and prepares an {@link ElasticsearchClient} instance. It also validates required arguments. + * + * @param descriptor Resource descriptor containing configuration arguments + * @param getResource Function to resolve other resources by name and type + * @throws IllegalArgumentException if required arguments are missing or invalid + */ + public ElasticsearchVectorStore( + ResourceDescriptor descriptor, BiFunction getResource) { + super(descriptor, getResource); + + // Required query-related arguments + this.index = descriptor.getArgument("index"); + this.vectorField = descriptor.getArgument("vector_field"); + final Integer dimsArg = descriptor.getArgument("dims"); + this.dims = (dimsArg != null) ? dimsArg : DEFAULT_DIMENSION; + this.filterQuery = descriptor.getArgument("filter_query"); + + this.k = descriptor.getArgument("k"); + this.numCandidates = descriptor.getArgument("num_candidates"); + + if (this.k != null && this.numCandidates != null) { + if (this.k < this.numCandidates) { + throw new IllegalArgumentException( + "'k' should be greater or equals than 'num_candidates'"); + } + } + + if (this.vectorField == null || this.vectorField.isEmpty()) { + throw new IllegalArgumentException("'vector_field' should not be null or empty"); + } + + if (this.index == null || this.index.isEmpty()) { + throw new IllegalArgumentException("'index' should not be null or empty"); + } + + // Resolve Elasticsearch HTTP hosts. Precedence: host -> hosts -> default localhost + final String hostUrl = descriptor.getArgument("host"); + final String hostsCsv = descriptor.getArgument("hosts"); + final List httpHosts = new ArrayList<>(); + + if (hostUrl != null) { + httpHosts.add(HttpHost.create(hostUrl)); + } else if (hostsCsv != null) { + for (String host : hostsCsv.split(",")) { + httpHosts.add(HttpHost.create(host.trim())); + } + } else { + httpHosts.add(HttpHost.create("localhost:9200")); + } + + final RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new HttpHost[0])); + + // Authentication configuration: API key (preferred) or basic auth + final String username = descriptor.getArgument("username"); + final String password = descriptor.getArgument("password"); + final String apiKeyBase64 = descriptor.getArgument("api_key_base64"); + final String apiKeyId = descriptor.getArgument("api_key_id"); + final String apiKeySecret = descriptor.getArgument("api_key_secret"); + + if (apiKeyBase64 != null || (apiKeyId != null && apiKeySecret != null)) { + // Construct base64 token if only id/secret is provided + String token = apiKeyBase64; + if (token == null) { + String idColonSecret = apiKeyId + ":" + apiKeySecret; + token = + Base64.getEncoder() + .encodeToString(idColonSecret.getBytes(StandardCharsets.UTF_8)); + } + final Header[] defaultHeaders = + new Header[] {new BasicHeader("Authorization", "ApiKey " + token)}; + builder.setDefaultHeaders(defaultHeaders); + } else if (username != null && password != null) { + // Fall back to HTTP basic authentication + final BasicCredentialsProvider creds = new BasicCredentialsProvider(); + creds.setCredentials( + AuthScope.ANY, new UsernamePasswordCredentials(username, password)); + builder.setHttpClientConfigCallback(hcb -> hcb.setDefaultCredentialsProvider(creds)); + } + + // Build the REST client and the transport layer used by the high-level client + final RestClient restClient = builder.build(); + final ElasticsearchTransport transport = + new RestClientTransport(restClient, new JacksonJsonpMapper()); + this.client = new ElasticsearchClient(transport); + } + + /** + * Returns default store-level arguments collected from the descriptor. + * + *

The returned map can be merged with per-query arguments to form the complete set of + * parameters for a vector search operation. + * + * @return map of default store arguments such as {@code index}, {@code vector_field}, {@code + * dims}, and optionally {@code k}, {@code num_candidates}, {@code filter_query}. + */ + @Override + public Map getStoreKwargs() { + final Map m = new HashMap<>(); + m.put("index", this.index); + m.put("vector_field", this.vectorField); + m.put("dims", this.dims); + if (this.k != null) { + m.put("k", this.k); + } + if (this.numCandidates != null) { + m.put("num_candidates", this.numCandidates); + } + if (this.filterQuery != null) { + m.put("filter_query", this.filterQuery); + } + return m; + } + + /** + * Executes a KNN vector search using a pre-computed embedding. + * + *

The method prepares a KNN search request using the supplied {@code embedding} and merges + * default arguments from the store with the provided {@code args}. Optional filter queries + * (JSON DSL) are applied as a post filter. + * + * @param embedding The embedding vector to search with + * @param limit Maximum number of items the caller is interested in; used as a fallback for + * {@code k} if not explicitly provided + * @param args Additional arguments. Supported keys: {@code k}, {@code num_candidates}, {@code + * filter_query} + * @return A list of matching documents, possibly empty + * @throws RuntimeException if the search request fails + */ + @Override + @SuppressWarnings({"unchecked", "rawtypes"}) + public List>> queryEmbedding( + float[] embedding, int limit, Map args) { + try { + int k = (int) args.getOrDefault("k", Math.max(1, limit)); + + int numCandidates = (int) args.getOrDefault("num_candidates", Math.max(100, k * 2)); + String filter = (String) args.get("filter_query"); + + + List queryVector = new ArrayList<>(embedding.length); + for (float v : embedding) queryVector.add(v); + + SearchRequest.Builder builder = + new SearchRequest.Builder() + .index(this.index) + .knn( + kb -> + kb.field(this.vectorField) + .queryVector(queryVector) + .k(k) + .numCandidates(numCandidates)); + + if (filter != null) { + builder = builder.postFilter(f -> f.withJson(new StringReader(filter))); + } + + final SearchResponse> searchResponse = + (SearchResponse) this.client.search(builder.build(), Map.class); + + final long total = searchResponse.hits().total().value(); + if (0 == total) { + return Collections.emptyList(); + } + + return getDocuments((int) total, searchResponse); + } catch (IOException e) { + throw new RuntimeException("Error performing KNN search", e); + } + } + + /** + * Converts Elasticsearch hits into {@link Document} instances with metadata. + * + * @param total total number of hits reported by Elasticsearch + * @param searchResponse the search response containing hits + * @return list of {@code Document} objects constructed from hits + */ + private List>> getDocuments( + int total, SearchResponse> searchResponse) { + final List>> documents = new ArrayList<>(total); + for (Hit> hit : searchResponse.hits().hits()) { + final Map _source = hit.source(); + final String id = hit.id(); + final Double score = hit.score(); + final String index = hit.index(); + final Map metadata = Map.of("id", id, "score", score, "index", index); + final Document> document = new Document<>(_source, metadata, id); + documents.add(document); + } + return documents; + } +} diff --git a/integrations/vector-stores/pom.xml b/integrations/vector-stores/pom.xml new file mode 100644 index 00000000..78f49603 --- /dev/null +++ b/integrations/vector-stores/pom.xml @@ -0,0 +1,37 @@ + + + + 4.0.0 + + + org.apache.flink + flink-agents-integrations + 0.2-SNAPSHOT + + + flink-agents-integrations-vector-stores + Flink Agents : Integrations: Vector Stores + pom + + + elasticsearch + + + \ No newline at end of file From a69f7eaa592701519df54e5937f5844117e32877 Mon Sep 17 00:00:00 2001 From: twosom Date: Tue, 25 Nov 2025 23:15:17 +0900 Subject: [PATCH 08/13] chore : spotless --- .../vectorstores/elasticsearch/ElasticsearchVectorStore.java | 1 - 1 file changed, 1 deletion(-) diff --git a/integrations/vector-stores/elasticsearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/elasticsearch/ElasticsearchVectorStore.java b/integrations/vector-stores/elasticsearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/elasticsearch/ElasticsearchVectorStore.java index 43435abb..f209707b 100644 --- a/integrations/vector-stores/elasticsearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/elasticsearch/ElasticsearchVectorStore.java +++ b/integrations/vector-stores/elasticsearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/elasticsearch/ElasticsearchVectorStore.java @@ -253,7 +253,6 @@ public List>> queryEmbedding( int numCandidates = (int) args.getOrDefault("num_candidates", Math.max(100, k * 2)); String filter = (String) args.get("filter_query"); - List queryVector = new ArrayList<>(embedding.length); for (float v : embedding) queryVector.add(v); From 87a0c54d1a2b97b79a0cb51d30bd5e07ff55001b Mon Sep 17 00:00:00 2001 From: twosom Date: Tue, 25 Nov 2025 23:16:00 +0900 Subject: [PATCH 09/13] feat(examples): add ElasticsearchVectorStore RAG example with Ollama integration Add comprehensive RAG example demonstrating retrieval-augmented generation using Elasticsearch as vector store and Ollama for embeddings/chat. Includes knowledge base setup utility with sample documents and supports various authentication methods. --- examples/pom.xml | 10 + .../rag/ElasticsearchKnowledgeBaseSetup.java | 642 ++++++++++++++++++ .../examples/rag/ElasticsearchRagExample.java | 422 ++++++++++++ 3 files changed, 1074 insertions(+) create mode 100644 examples/src/main/java/org/apache/flink/agents/examples/rag/ElasticsearchKnowledgeBaseSetup.java create mode 100644 examples/src/main/java/org/apache/flink/agents/examples/rag/ElasticsearchRagExample.java diff --git a/examples/pom.xml b/examples/pom.xml index 1b8d543e..e59ff96f 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -69,6 +69,16 @@ under the License. flink-agents-integrations-chat-models-ollama ${project.version} + + org.apache.flink + flink-agents-integrations-embedding-models-ollama + ${project.version} + + + org.apache.flink + flink-agents-integrations-vector-stores-elasticsearch + ${project.version} + diff --git a/examples/src/main/java/org/apache/flink/agents/examples/rag/ElasticsearchKnowledgeBaseSetup.java b/examples/src/main/java/org/apache/flink/agents/examples/rag/ElasticsearchKnowledgeBaseSetup.java new file mode 100644 index 00000000..3810add0 --- /dev/null +++ b/examples/src/main/java/org/apache/flink/agents/examples/rag/ElasticsearchKnowledgeBaseSetup.java @@ -0,0 +1,642 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.examples.rag; + +import co.elastic.clients.elasticsearch.ElasticsearchClient; +import co.elastic.clients.elasticsearch.core.BulkRequest; +import co.elastic.clients.elasticsearch.core.BulkResponse; +import co.elastic.clients.elasticsearch.core.bulk.BulkOperation; +import co.elastic.clients.elasticsearch.core.bulk.CreateOperation; +import co.elastic.clients.elasticsearch.indices.CreateIndexRequest; +import co.elastic.clients.elasticsearch.indices.CreateIndexResponse; +import co.elastic.clients.elasticsearch.indices.ExistsRequest; +import co.elastic.clients.json.jackson.JacksonJsonpMapper; +import co.elastic.clients.transport.ElasticsearchTransport; +import co.elastic.clients.transport.endpoints.BooleanResponse; +import co.elastic.clients.transport.rest_client.RestClientTransport; +import org.apache.flink.agents.api.resource.ResourceDescriptor; +import org.apache.flink.agents.integrations.embeddingmodels.ollama.OllamaEmbeddingModelConnection; +import org.apache.http.Header; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.message.BasicHeader; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.*; + +/** + * Helper for preparing the Elasticsearch knowledge base used by the RAG example. + * + *

Main responsibilities: + * + *

+ * + *

How it is used: + * + *

{@code
+ * ElasticsearchKnowledgeBaseSetup.populate(
+ *     esHost,
+ *     index,
+ *     vectorField,
+ *     dims,
+ *     similarity,         // e.g., "cosine"
+ *     ollamaEndpoint,     // e.g., "http://localhost:11434"
+ *     embeddingModel      // e.g., "nomic-embed-text"
+ * );
+ * }
+ * + *

Authentication (System properties): + * + *

+ * + * If none of the above are provided, no authentication headers are added. + * + *

Connection defaults: + * + *

+ * + *

Index mapping expectation: + * + *

+ * + *

Batching behavior (not externally configurable): + * + *

+ * + * These defaults are internal and intentionally not exposed via system properties to keep the + * example simple and safe. + * + *

Documents source: + * + *

+ */ +public class ElasticsearchKnowledgeBaseSetup { + + public static class SampleDoc { + public final String id; + public final String content; + public final Map metadata; + + public SampleDoc(String id, String content, Map metadata) { + this.id = id; + this.content = content; + this.metadata = metadata; + } + } + + public static void populate( + String esHost, + String index, + String vectorField, + int dims, + String similarity, + String ollamaEndpoint, + String embeddingModel) + throws IOException { + + ElasticsearchClient client = buildEsClient(esHost); + ensureIndex(client, index, vectorField, dims, similarity); + + // Prepare embedding connection + OllamaEmbeddingModelConnection embeddingConn = + buildEmbeddingConnection(ollamaEndpoint, embeddingModel); + + // Load documents using a simplified loader selection + List docs = loadDocuments(); + + if (docs.isEmpty()) { + System.out.println("[KB Setup] No documents to index."); + return; + } + + int indexed = embedAndIndexAll(client, index, vectorField, embeddingConn, docs); + + System.out.printf("[KB Setup] Indexed %d sample documents into '%s'%n", indexed, index); + } + + /** Builds an embedding connection for Ollama. */ + private static OllamaEmbeddingModelConnection buildEmbeddingConnection( + String ollamaEndpoint, String embeddingModel) { + ResourceDescriptor embeddingConnDesc = + ResourceDescriptor.Builder.newBuilder( + OllamaEmbeddingModelConnection.class.getName()) + .addInitialArgument("host", ollamaEndpoint) + .addInitialArgument("model", embeddingModel) + .build(); + return new OllamaEmbeddingModelConnection(embeddingConnDesc, (name, type) -> null); + } + + /** Default internal batch sizes (not configurable via system properties). */ + private static final int DEFAULT_EMBED_BATCH = 32; + + private static final int DEFAULT_BULK_SIZE = 100; + + /** Embeds all documents in batches and performs bulk indexing. */ + private static int embedAndIndexAll( + ElasticsearchClient client, + String index, + String vectorField, + OllamaEmbeddingModelConnection embeddingConn, + List docs) + throws IOException { + int indexed = 0; + for (int start = 0; start < docs.size(); start += DEFAULT_EMBED_BATCH) { + int end = Math.min(start + DEFAULT_EMBED_BATCH, docs.size()); + List batch = docs.subList(start, end); + List texts = new ArrayList<>(batch.size()); + for (SampleDoc d : batch) { + texts.add(d.content); + } + + List embeddings = embeddingConn.embed(texts, Map.of()); + indexed += bulkIndexBatch(client, index, vectorField, batch, embeddings); + } + return indexed; + } + + /** Converts float[] to List as expected by ES Java API. */ + private static List toFloatList(float[] vec) { + List list = new ArrayList<>(vec.length); + for (float v : vec) list.add(v); + return list; + } + + /** Prepares and executes bulk index requests for a batch. */ + private static int bulkIndexBatch( + ElasticsearchClient client, + String index, + String vectorField, + List batch, + List embeddings) + throws IOException { + int indexed = 0; + List ops = new ArrayList<>(); + for (int i = 0; i < batch.size(); i++) { + SampleDoc d = batch.get(i); + float[] vec = embeddings.get(i); + Map sourceDoc = new HashMap<>(); + sourceDoc.put("content", d.content); + sourceDoc.put("metadata", d.metadata); + sourceDoc.put(vectorField, toFloatList(vec)); + + CreateOperation> createOp = + CreateOperation.>of( + b -> b.index(index).id(d.id).document(sourceDoc)); + ops.add(BulkOperation.of(b -> b.create(createOp))); + + // flush if reached bulkSize + if (ops.size() >= DEFAULT_BULK_SIZE) { + indexed += executeBulk(client, ops); + ops.clear(); + } + } + if (!ops.isEmpty()) { + indexed += executeBulk(client, ops); + } + return indexed; + } + + private static int executeBulk(ElasticsearchClient client, List ops) + throws IOException { + BulkResponse resp = client.bulk(BulkRequest.of(b -> b.operations(ops))); + int success = 0; + if (resp.errors()) { + resp.items() + .forEach( + item -> { + if (item.error() == null) { + // created + } else if (item.status() == 409) { + // already exists due to create opType, treat as success for + // idempotency + } + }); + } + // Count successes as created or already-exists (409). The client doesn't give direct count; + // approximate by total ops minus real errors != 409 + for (var item : resp.items()) { + if (item.error() == null || item.status() == 409) { + success++; + } + } + return success; + } + + private static List loadDocuments() { + List all = new ArrayList<>(); + // Core topics + all.add( + new SampleDoc( + "flink-001", + "Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams.", + Map.of("source", "official", "topic", "flink"))); + all.add( + new SampleDoc( + "flink-002", + "Flink supports event-time processing with watermarks, enabling out-of-order stream handling.", + Map.of("source", "docs", "topic", "event-time"))); + all.add( + new SampleDoc( + "flink-003", + "Checkpoints and savepoints provide fault tolerance in Flink, allowing exactly-once state consistency.", + Map.of("source", "docs", "topic", "fault-tolerance"))); + all.add( + new SampleDoc( + "flink-004", + "The DataStream API offers low-level stream processing primitives with rich windowing and state APIs.", + Map.of("source", "docs", "topic", "datastream"))); + all.add( + new SampleDoc( + "flink-005", + "The Table/SQL API offers relational processing on streams and batches, integrating with the optimizer.", + Map.of("source", "docs", "topic", "table"))); + all.add( + new SampleDoc( + "flink-006", + "Flink can use RocksDB as a state backend for large-scale, disk-based keyed state.", + Map.of("source", "docs", "topic", "rocksdb"))); + all.add( + new SampleDoc( + "flink-007", + "Kafka connector enables exactly-once end-to-end semantics with transactions in Flink.", + Map.of("source", "docs", "topic", "kafka"))); + all.add( + new SampleDoc( + "flink-008", + "Async I/O in Flink helps hide latency of external requests by overlapping computation and IO.", + Map.of("source", "blog", "topic", "async-io"))); + all.add( + new SampleDoc( + "flink-009", + "Flink SQL integrates relational operations with streaming semantics for continuous queries.", + Map.of("source", "docs", "topic", "flink-sql"))); + all.add( + new SampleDoc( + "flink-010", + "Flink supports stateful functions and complex event processing for advanced stream applications.", + Map.of("source", "kb", "topic", "cep"))); + + // Flink Agents & RAG + all.add( + new SampleDoc( + "agents-001", + "Flink Agents integrates LLM-powered agents with Flink to build RAG, tools, and multi-agent workflows on streaming data.", + Map.of("source", "docs", "topic", "flink-agents"))); + all.add( + new SampleDoc( + "agents-002", + "Retrieval-Augmented Generation (RAG) retrieves context from a knowledge base to ground LLM responses.", + Map.of("source", "kb", "topic", "rag"))); + all.add( + new SampleDoc( + "agents-003", + "Vector stores index embeddings to enable semantic search over large corpora.", + Map.of("source", "kb", "topic", "vector-store"))); + all.add( + new SampleDoc( + "agents-004", + "Agents can call tools to interact with external systems, enabling action-based workflows.", + Map.of("source", "kb", "topic", "agents"))); + all.add( + new SampleDoc( + "agents-005", + "In RAG, chunking and retrieval quality directly affect the final answer relevance.", + Map.of("source", "kb", "topic", "rag"))); + + // Elasticsearch & vectors + all.add( + new SampleDoc( + "es-001", + "Elasticsearch supports dense_vector fields and KNN search for vector similarity queries.", + Map.of("source", "official", "topic", "elasticsearch"))); + all.add( + new SampleDoc( + "es-002", + "Cosine similarity is commonly used for normalized embedding vectors in vector search.", + Map.of("source", "kb", "topic", "similarity"))); + all.add( + new SampleDoc( + "es-003", + "Num candidates controls the trade-off between accuracy and speed in approximate KNN search.", + Map.of("source", "kb", "topic", "knn"))); + all.add( + new SampleDoc( + "es-004", + "Use bulk indexing to efficiently insert many documents into Elasticsearch.", + Map.of("source", "kb", "topic", "bulk"))); + all.add( + new SampleDoc( + "es-005", + "Filtering can be combined with vector search via post_filter to constrain results by metadata.", + Map.of("source", "kb", "topic", "filter"))); + + // Languages & general CS + all.add( + new SampleDoc( + "python-001", + "Python is a high-level, interpreted programming language known for its readability and extensive ecosystem.", + Map.of("source", "wiki", "topic", "python"))); + all.add( + new SampleDoc( + "java-001", + "Java is a class-based, object-oriented programming language designed to have as few implementation dependencies as possible.", + Map.of("source", "wiki", "topic", "java"))); + all.add( + new SampleDoc( + "stream-001", + "Stream processing focuses on processing data continuously as it arrives, offering low latency.", + Map.of("source", "kb", "topic", "streaming"))); + all.add( + new SampleDoc( + "batch-001", + "Batch processing handles finite datasets and is optimized for throughput over latency.", + Map.of("source", "kb", "topic", "batch"))); + all.add( + new SampleDoc( + "ml-001", + "Embeddings map text into dense vectors where semantic similarity corresponds to geometric closeness.", + Map.of("source", "kb", "topic", "embeddings"))); + all.add( + new SampleDoc( + "llm-001", + "Large language models can be grounded with retrieved context to reduce hallucinations.", + Map.of("source", "kb", "topic", "llm"))); + + // CEP and patterns + all.add( + new SampleDoc( + "cep-001", + "Flink CEP library allows pattern detection on event streams with complex conditions and time windows.", + Map.of("source", "docs", "topic", "cep"))); + all.add( + new SampleDoc( + "watermark-001", + "Watermarks are special markers in a stream that indicate progress of event time.", + Map.of("source", "docs", "topic", "watermark"))); + + // Add a few more generic entries + all.add( + new SampleDoc( + "ops-001", + "Flink supports native Kubernetes deployments, offering reactive scaling and high availability.", + Map.of("source", "docs", "topic", "ops"))); + all.add( + new SampleDoc( + "state-001", + "Keyed state in Flink enables per-key storage and computation, backed by efficient state backends.", + Map.of("source", "docs", "topic", "state"))); + all.add( + new SampleDoc( + "windows-001", + "Flink supports tumbling, sliding, and session windows to aggregate events over time.", + Map.of("source", "docs", "topic", "window"))); + + // Multilingual (Korean) short summaries + all.add( + new SampleDoc( + "ko-flink-001", + "Apache Flink은 무한/유한 스트림 데이터를 상태 기반으로 처리하는 분산 처리 엔진입니다.", + Map.of("source", "ko", "topic", "flink", "lang", "ko"))); + all.add( + new SampleDoc( + "ko-rag-001", + "RAG는 지식 베이스에서 관련 문서를 검색하여 LLM 응답을 보강하는 방식입니다.", + Map.of("source", "ko", "topic", "rag", "lang", "ko"))); + all.add( + new SampleDoc( + "ko-es-001", + "Elasticsearch의 dense_vector 필드는 임베딩 벡터를 저장하고 유사도 검색을 지원합니다.", + Map.of("source", "ko", "topic", "elasticsearch", "lang", "ko"))); + all.add( + new SampleDoc( + "ko-python-001", + "파이썬은 가독성이 좋고 생태계가 풍부한 고수준 프로그래밍 언어입니다.", + Map.of("source", "ko", "topic", "python", "lang", "ko"))); + all.add( + new SampleDoc( + "ko-stream-001", + "스트림 처리란 데이터가 도착하는 즉시 지속적으로 처리하는 방식으로 낮은 지연을 제공합니다.", + Map.of("source", "ko", "topic", "streaming", "lang", "ko"))); + + return all; + } + + private static List loadFromClasspath(String resourcePath) { + if (resourcePath == null || resourcePath.isEmpty()) { + System.err.println("[KB Setup] ES_POPULATE_PATH is required for classpath source."); + return List.of(); + } + try (java.io.InputStream in = + Thread.currentThread().getContextClassLoader().getResourceAsStream(resourcePath)) { + if (in == null) { + System.err.println("[KB Setup] Classpath resource not found: " + resourcePath); + return List.of(); + } + return parseJsonOrJsonl(new java.io.InputStreamReader(in)); + } catch (Exception e) { + System.err.println("[KB Setup] Failed to read classpath resource: " + e.getMessage()); + return List.of(); + } + } + + private static List loadFromFile(String filePath) { + if (filePath == null || filePath.isEmpty()) { + System.err.println("[KB Setup] ES_POPULATE_PATH is required for file source."); + return List.of(); + } + try (java.io.Reader reader = + java.nio.file.Files.newBufferedReader(java.nio.file.Path.of(filePath))) { + return parseJsonOrJsonl(reader); + } catch (Exception e) { + System.err.println("[KB Setup] Failed to read file: " + e.getMessage()); + return List.of(); + } + } + + private static List parseJsonOrJsonl(java.io.Reader reader) throws IOException { + List docs = new ArrayList<>(); + com.fasterxml.jackson.databind.ObjectMapper mapper = + new com.fasterxml.jackson.databind.ObjectMapper(); + // Try JSON array first + try { + List> arr = mapper.readValue(reader, List.class); + for (Map m : arr) { + String id = String.valueOf(m.getOrDefault("id", UUID.randomUUID().toString())); + String content = String.valueOf(m.get("content")); + if (content == null || content.isEmpty()) continue; + @SuppressWarnings("unchecked") + Map metadata = + (Map) m.getOrDefault("metadata", Map.of()); + docs.add(new SampleDoc(id, content, metadata)); + } + return docs; + } catch (Exception ignore) { + // Fallback to JSONL + } + + // Re-open reader as we consumed it + if (reader instanceof java.io.BufferedReader) { + // no-op + } + try (java.io.BufferedReader br = new java.io.BufferedReader(reader)) { + String line; + com.fasterxml.jackson.databind.ObjectMapper om = + new com.fasterxml.jackson.databind.ObjectMapper(); + while ((line = br.readLine()) != null) { + line = line.trim(); + if (line.isEmpty()) continue; + Map m = om.readValue(line, Map.class); + String id = String.valueOf(m.getOrDefault("id", UUID.randomUUID().toString())); + String content = String.valueOf(m.get("content")); + if (content == null || content.isEmpty()) continue; + @SuppressWarnings("unchecked") + Map metadata = + (Map) m.getOrDefault("metadata", Map.of()); + docs.add(new SampleDoc(id, content, metadata)); + } + } + return docs; + } + + // Removed unused exists(...) helper to simplify the class + + private static void ensureIndex( + ElasticsearchClient client, + String index, + String vectorField, + int dims, + String similarity) + throws IOException { + BooleanResponse exists = client.indices().exists(ExistsRequest.of(b -> b.index(index))); + if (exists.value()) { + return; + } + + // Build simple mapping with text + metadata + dense_vector + Map mapping = new HashMap<>(); + Map props = new HashMap<>(); + props.put("content", Map.of("type", "text")); + props.put("metadata", Map.of("type", "object", "enabled", false)); + Map vector = new HashMap<>(); + vector.put("type", "dense_vector"); + vector.put("dims", dims); + if (similarity != null && !similarity.isEmpty()) { + vector.put("similarity", similarity); + } + props.put(vectorField, vector); + mapping.put("properties", props); + + CreateIndexResponse createRes = + client.indices() + .create( + CreateIndexRequest.of( + b -> + b.index(index) + .withJson( + new java.io.StringReader( + toJson( + Map.of( + "mappings", + mapping)))))); + + if (!createRes.acknowledged()) { + throw new IOException("Failed to create index: " + index); + } + } + + private static String toJson(Map map) { + try { + return new com.fasterxml.jackson.databind.ObjectMapper().writeValueAsString(map); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private static ElasticsearchClient buildEsClient(String hostUrl) { + List httpHosts = new ArrayList<>(); + if (hostUrl == null || hostUrl.isEmpty()) { + hostUrl = "http://localhost:9200"; + } + httpHosts.add(HttpHost.create(hostUrl)); + + RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new HttpHost[0])); + + // Optional auth via system properties + String apiKeyBase64 = System.getProperty("ES_API_KEY_BASE64"); + String apiKeyId = System.getProperty("ES_API_KEY_ID"); + String apiKeySecret = System.getProperty("ES_API_KEY_SECRET"); + String username = System.getProperty("ES_USERNAME"); + String password = System.getProperty("ES_PASSWORD"); + + if (apiKeyBase64 != null || (apiKeyId != null && apiKeySecret != null)) { + String token = apiKeyBase64; + if (token == null) { + String idColonSecret = apiKeyId + ":" + apiKeySecret; + token = + Base64.getEncoder() + .encodeToString(idColonSecret.getBytes(StandardCharsets.UTF_8)); + } + final Header[] defaultHeaders = + new Header[] {new BasicHeader("Authorization", "ApiKey " + token)}; + builder.setDefaultHeaders(defaultHeaders); + } else if (username != null && password != null) { + final BasicCredentialsProvider creds = new BasicCredentialsProvider(); + creds.setCredentials( + AuthScope.ANY, new UsernamePasswordCredentials(username, password)); + builder.setHttpClientConfigCallback(hcb -> hcb.setDefaultCredentialsProvider(creds)); + } + + RestClient lowLevelClient = builder.build(); + ElasticsearchTransport transport = + new RestClientTransport(lowLevelClient, new JacksonJsonpMapper()); + return new ElasticsearchClient(transport); + } +} diff --git a/examples/src/main/java/org/apache/flink/agents/examples/rag/ElasticsearchRagExample.java b/examples/src/main/java/org/apache/flink/agents/examples/rag/ElasticsearchRagExample.java new file mode 100644 index 00000000..76cc3fd3 --- /dev/null +++ b/examples/src/main/java/org/apache/flink/agents/examples/rag/ElasticsearchRagExample.java @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.examples.rag; + +import org.apache.flink.agents.api.Agent; +import org.apache.flink.agents.api.AgentsExecutionEnvironment; +import org.apache.flink.agents.api.InputEvent; +import org.apache.flink.agents.api.OutputEvent; +import org.apache.flink.agents.api.annotation.*; +import org.apache.flink.agents.api.chat.messages.ChatMessage; +import org.apache.flink.agents.api.chat.messages.MessageRole; +import org.apache.flink.agents.api.context.RunnerContext; +import org.apache.flink.agents.api.event.ChatRequestEvent; +import org.apache.flink.agents.api.event.ChatResponseEvent; +import org.apache.flink.agents.api.event.ContextRetrievalRequestEvent; +import org.apache.flink.agents.api.event.ContextRetrievalResponseEvent; +import org.apache.flink.agents.api.resource.ResourceDescriptor; +import org.apache.flink.agents.api.resource.ResourceType; +import org.apache.flink.agents.api.vectorstores.Document; +import org.apache.flink.agents.integrations.chatmodels.ollama.OllamaChatModelConnection; +import org.apache.flink.agents.integrations.chatmodels.ollama.OllamaChatModelSetup; +import org.apache.flink.agents.integrations.embeddingmodels.ollama.OllamaEmbeddingModelConnection; +import org.apache.flink.agents.integrations.vectorstores.elasticsearch.ElasticsearchVectorStore; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Retrieval-Augmented Generation (RAG) example using Ollama for embeddings and chat along with + * Elasticsearch as the vector store. + * + *

This example demonstrates an agent that: + * + *

    + *
  • Embeds the incoming user query using an Ollama embedding model, + *
  • Retrieves relevant context from Elasticsearch via a {@code VectorStore}, + *
  • Formats a prompt that includes the retrieved context, and + *
  • Generates a response using an Ollama chat model. + *
+ * + *

Prerequisites: + * + *

    + *
  • Elasticsearch 8.x reachable from this example with an index that contains a {@code + * dense_vector} field for KNN search. + *
  • Ollama running locally (default {@code http://localhost:11434}) with the configured + * embedding and chat models available. + *
+ * + *

Example Elasticsearch mapping (adjust index name, field, dims, and similarity to your needs): + * + *

{@code
+ * {
+ *   "mappings": {
+ *     "properties": {
+ *       "content": { "type": "text" },
+ *       "metadata": { "type": "object", "enabled": false },
+ *       "content_vector": { "type": "dense_vector", "dims": 768, "similarity": "cosine" }
+ *     }
+ *   }
+ * }
+ * }
+ * + *

System properties you can override: + * + *

    + *
  • {@code ES_HOST} (default {@code http://localhost:9200}) + *
  • {@code ES_INDEX} (default {@code my_documents}) + *
  • {@code ES_VECTOR_FIELD} (default {@code content_vector}) + *
  • {@code ES_DIMS} (default {@code 768}) + *
  • {@code ES_SIMILARITY} (default {@code cosine}) — used by the optional population step + *
  • Authentication (optional, used by both vector store and population step): + *
      + *
    • {@code ES_API_KEY_BASE64} — Base64 of {@code apiKeyId:apiKeySecret} + *
    • {@code ES_API_KEY_ID} and {@code ES_API_KEY_SECRET} — combined and Base64-encoded + *
    • {@code ES_USERNAME} and {@code ES_PASSWORD} — basic authentication + *
    + *
  • {@code OLLAMA_ENDPOINT} (default {@code http://localhost:11434}) + *
  • {@code OLLAMA_EMBEDDING_MODEL} (default {@code nomic-embed-text}) + *
  • {@code OLLAMA_CHAT_MODEL} (default {@code qwen3:8b}) + *
  • {@code ES_POPULATE} (default {@code true}) — whether to populate sample data on startup + *
+ * + *

Direct CLI flags (optional):
+ * Instead of or in addition to system properties, you can pass flags when starting the job. CLI + * flags take precedence over existing system properties. + * + *

    + *
  • {@code --es.host=http://your-es:9200} + *
  • {@code --es.username=elastic} and {@code --es.password=secret} + *
  • {@code --es.apiKeyBase64=BASE64_ID_COLON_SECRET} or {@code --es.apiKeyId=ID} with {@code + * --es.apiKeySecret=SECRET} + *
  • {@code --es.index=my_documents}, {@code --es.vectorField=content_vector}, {@code + * --es.dims=768}, {@code --es.similarity=cosine} + *
  • {@code --ollama.endpoint=http://localhost:11434}, {@code + * --ollama.embeddingModel=nomic-embed-text}, {@code --ollama.chatModel=qwen3:8b} + *
  • {@code --es.populate=true|false} + *
+ * + *

Examples: + * + *

{@code
+ * # Use API key (base64 of id:secret) and custom host
+ * flink run ... -c org.apache.flink.agents.examples.rag.ElasticsearchRagExample \
+ *   examples.jar --es.host=http://es:9200 --es.apiKeyBase64=XXXXX=
+ *
+ * # Use basic authentication
+ * flink run ... -c org.apache.flink.agents.examples.rag.ElasticsearchRagExample \
+ *   examples.jar --es.host=http://es:9200 --es.username=elastic --es.password=secret
+ * }
+ * + *

Notes: + * + *

    + *
  • Authentication can be provided via either CLI flags or System properties; API key takes + * precedence over basic auth when both are present. + *
  • The optional knowledge base population step uses the same System properties to connect to + * Elasticsearch. + *
+ * + *

Running the example will: + * + *

    + *
  1. Optionally populate the Elasticsearch index with sample documents and stored vectors, + *
  2. Create a simple agent pipeline that retrieves context from Elasticsearch, and + *
  3. Print the model's answers for a set of example queries. + *
+ */ +public class ElasticsearchRagExample { + + public static class MyRagAgent extends Agent { + + @Prompt + public static org.apache.flink.agents.api.prompt.Prompt contextEnhancedPrompt() { + String template = + "Based on the following context, please answer the user's question.\n\n" + + "Context:\n{context}\n\n" + + "User Question:\n{user_query}\n\n" + + "Please provide a helpful answer based on the context provided."; + return new org.apache.flink.agents.api.prompt.Prompt(template); + } + + @EmbeddingModelSetup + public static ResourceDescriptor textEmbedder() { + // Embedding setup referencing the embedding connection name + return ResourceDescriptor.Builder.newBuilder( + OllamaEmbeddingModelConnection.class.getName()) + .addInitialArgument("connection", "ollamaEmbeddingConnection") + .addInitialArgument( + "model", + System.getProperty("OLLAMA_EMBEDDING_MODEL", "nomic-embed-text")) + .build(); + } + + @ChatModelConnection + public static ResourceDescriptor ollamaChatModelConnection() { + return ResourceDescriptor.Builder.newBuilder(OllamaChatModelConnection.class.getName()) + .addInitialArgument( + "endpoint", + System.getProperty("OLLAMA_ENDPOINT", "http://localhost:11434")) + .addInitialArgument("requestTimeout", 120) + .build(); + } + + @ChatModelSetup + public static ResourceDescriptor chatModel() { + return ResourceDescriptor.Builder.newBuilder(OllamaChatModelSetup.class.getName()) + .addInitialArgument("connection", "ollamaChatModelConnection") + .addInitialArgument( + "model", System.getProperty("OLLAMA_CHAT_MODEL", "qwen3:8b")) + .build(); + } + + @VectorStore + public static ResourceDescriptor knowledgeBase() { + ResourceDescriptor.Builder builder = + ResourceDescriptor.Builder.newBuilder(ElasticsearchVectorStore.class.getName()) + .addInitialArgument("embedding_model", "textEmbedder") + .addInitialArgument( + "index", System.getProperty("ES_INDEX", "my_documents")) + .addInitialArgument( + "vector_field", + System.getProperty("ES_VECTOR_FIELD", "content_vector")) + .addInitialArgument("dims", Integer.getInteger("ES_DIMS", 768)) + .addInitialArgument( + "host", System.getProperty("ES_HOST", "http://localhost:9200")); + + // Optional authentication + String apiKeyBase64 = System.getProperty("ES_API_KEY_BASE64"); + String apiKeyId = System.getProperty("ES_API_KEY_ID"); + String apiKeySecret = System.getProperty("ES_API_KEY_SECRET"); + String username = System.getProperty("ES_USERNAME"); + String password = System.getProperty("ES_PASSWORD"); + + if (apiKeyBase64 != null && !apiKeyBase64.isEmpty()) { + builder.addInitialArgument("api_key_base64", apiKeyBase64); + } else if (apiKeyId != null + && apiKeySecret != null + && !apiKeyId.isEmpty() + && !apiKeySecret.isEmpty()) { + builder.addInitialArgument("api_key_id", apiKeyId) + .addInitialArgument("api_key_secret", apiKeySecret); + } else if (username != null + && password != null + && !username.isEmpty() + && !password.isEmpty()) { + builder.addInitialArgument("username", username) + .addInitialArgument("password", password); + } + + return builder.build(); + } + + /** + * Converts an incoming {@link InputEvent} into a {@link ContextRetrievalRequestEvent} that + * asks the vector store to fetch relevant documents for the input string. The vector store + * resource is referenced by name ({@code "knowledgeBase"}). + */ + @Action(listenEvents = {InputEvent.class}) + public static void processInput(InputEvent event, RunnerContext ctx) { + ctx.sendEvent( + new ContextRetrievalRequestEvent((String) event.getInput(), "knowledgeBase")); + } + + /** + * Receives retrieved documents from the vector store, constructs a context string, formats + * the prompt using the {@code contextEnhancedPrompt}, and emits a {@link ChatRequestEvent} + * targeting the configured chat model. + * + * @param event contains the user query and the list of retrieved documents + * @param context provides access to resources (e.g., the prompt) and lets the agent send + * the next event + */ + @Action(listenEvents = {ContextRetrievalResponseEvent.class}) + public static void processRetrievedContext( + ContextRetrievalResponseEvent> event, RunnerContext context) + throws Exception { + final String userQuery = event.getQuery(); + final List>> docs = event.getDocuments(); + + // Build context text from retrieved documents + List items = new ArrayList<>(); + for (int i = 0; i < docs.size(); i++) { + Object content = docs.get(i).getContent(); + items.add(String.format("%d. %s", i + 1, content)); + } + String contextText = String.join("\n\n", items); + + // Format enhanced prompt + org.apache.flink.agents.api.prompt.Prompt prompt = + (org.apache.flink.agents.api.prompt.Prompt) + context.getResource("contextEnhancedPrompt", ResourceType.PROMPT); + + String enhanced = + prompt.formatString( + Map.of( + "context", contextText, + "user_query", userQuery)); + + // Send chat request + ChatMessage userMsg = new ChatMessage(MessageRole.USER, enhanced); + context.sendEvent(new ChatRequestEvent("chatModel", List.of(userMsg))); + } + + /** + * Handles the final {@link ChatResponseEvent} from the model and forwards the text back as + * an {@link OutputEvent} to the outside world. + */ + @Action(listenEvents = ChatResponseEvent.class) + public static void processChatResponse(ChatResponseEvent event, RunnerContext ctx) { + String output = event.getResponse() != null ? event.getResponse().getContent() : ""; + ctx.sendEvent(new OutputEvent(output)); + } + } + + /** + * Entry point for the example. Optionally populates the Elasticsearch index with sample data + * and then builds a simple DataStream pipeline that sends example queries through the RAG + * agent. + * + *

System properties used by the optional population step: + * + *

    + *
  • {@code ES_POPULATE} — whether to insert sample docs and vectors (default {@code true}) + *
  • {@code ES_HOST}, {@code ES_INDEX}, {@code ES_VECTOR_FIELD}, {@code ES_DIMS}, {@code + * ES_SIMILARITY} + *
  • {@code OLLAMA_ENDPOINT}, {@code OLLAMA_EMBEDDING_MODEL} + *
+ */ + public static void main(String[] args) throws Exception { + System.out.println("Starting Elasticsearch RAG Example..."); + + // Parse CLI arguments and set System properties so both the knowledgeBase() and + // the optional knowledge base population step can use them. + parseArgsAndSetProperties(args); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + AgentsExecutionEnvironment agentsEnv = + AgentsExecutionEnvironment.getExecutionEnvironment(env); + + // Optionally populate ES with sample data for the demo + if (Boolean.parseBoolean(System.getProperty("ES_POPULATE", "true"))) { + String esHost = System.getProperty("ES_HOST", "http://localhost:9200"); + String index = System.getProperty("ES_INDEX", "my_documents"); + String vectorField = System.getProperty("ES_VECTOR_FIELD", "content_vector"); + int dims = Integer.getInteger("ES_DIMS", 768); + String similarity = System.getProperty("ES_SIMILARITY", "cosine"); + String ollamaEndpoint = System.getProperty("OLLAMA_ENDPOINT", "http://localhost:11434"); + String embeddingModel = + System.getProperty("OLLAMA_EMBEDDING_MODEL", "nomic-embed-text"); + + try { + ElasticsearchKnowledgeBaseSetup.populate( + esHost, + index, + vectorField, + dims, + similarity, + ollamaEndpoint, + embeddingModel); + } catch (Exception e) { + System.err.println( + "[KB Setup] Failed to populate ES sample data: " + e.getMessage()); + e.printStackTrace(); + } + } + + // Prepare example queries as a DataStream + DataStream queries = + env.fromData( + "What is Apache Flink?", + "What is Apache Flink Agents?", + "What is Python?", + "What is vector store?", + "Does flink supports k8s?", + "What is the capability of agentic system"); + + MyRagAgent agent = new MyRagAgent(); + + // Use DataStream pipeline instead of local list execution + agentsEnv.fromDataStream(queries).apply(agent).toDataStream().print(); + + // Execute the Flink job + agentsEnv.execute(); + } + + /** + * Very small CLI parser for convenience so you can pass ES host and credentials directly. + * Supported flags (CLI args have precedence over existing System properties): --es.host=URL, + * --es.username=USER, --es.password=PASS, --es.apiKeyBase64=BASE64, --es.apiKeyId=ID, + * --es.apiKeySecret=SECRET, plus optional index settings like --es.index, --es.vectorField, + * --es.dims, --es.similarity, and Ollama settings --ollama.endpoint, --ollama.embeddingModel, + * --ollama.chatModel. + */ + private static void parseArgsAndSetProperties(String[] args) { + if (args == null) return; + for (String a : args) { + if (a == null) continue; + if (!a.startsWith("--")) continue; + int eq = a.indexOf('='); + String key; + String val = ""; + if (eq > 2) { + key = a.substring(2, eq); + val = a.substring(eq + 1); + } else { + key = a.substring(2); + } + + if ("es.host".equals(key)) { + System.setProperty("ES_HOST", val); + } else if ("es.username".equals(key)) { + System.setProperty("ES_USERNAME", val); + } else if ("es.password".equals(key)) { + System.setProperty("ES_PASSWORD", val); + } else if ("es.apiKeyBase64".equals(key)) { + System.setProperty("ES_API_KEY_BASE64", val); + } else if ("es.apiKeyId".equals(key)) { + System.setProperty("ES_API_KEY_ID", val); + } else if ("es.apiKeySecret".equals(key)) { + System.setProperty("ES_API_KEY_SECRET", val); + } else if ("es.index".equals(key)) { + System.setProperty("ES_INDEX", val); + } else if ("es.vectorField".equals(key)) { + System.setProperty("ES_VECTOR_FIELD", val); + } else if ("es.dims".equals(key)) { + System.setProperty("ES_DIMS", val); + } else if ("es.similarity".equals(key)) { + System.setProperty("ES_SIMILARITY", val); + } else if ("ollama.endpoint".equals(key)) { + System.setProperty("OLLAMA_ENDPOINT", val); + } else if ("ollama.embeddingModel".equals(key)) { + System.setProperty("OLLAMA_EMBEDDING_MODEL", val); + } else if ("ollama.chatModel".equals(key)) { + System.setProperty("OLLAMA_CHAT_MODEL", val); + } else if ("es.populate".equals(key)) { + System.setProperty("ES_POPULATE", val); + } else { + // ignore unknown flags + } + } + } +} From 7c32ebe23526cb3d2c26092b252f640dbd8f1153 Mon Sep 17 00:00:00 2001 From: twosom Date: Sun, 30 Nov 2025 15:05:21 +0900 Subject: [PATCH 10/13] refactor: update embedding model architecture to use setup --- .../api/vectorstores/BaseVectorStore.java | 6 +++--- .../examples/rag/ElasticsearchRagExample.java | 17 +++++++++++++---- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/api/src/main/java/org/apache/flink/agents/api/vectorstores/BaseVectorStore.java b/api/src/main/java/org/apache/flink/agents/api/vectorstores/BaseVectorStore.java index 3d65e866..5a0d65f3 100644 --- a/api/src/main/java/org/apache/flink/agents/api/vectorstores/BaseVectorStore.java +++ b/api/src/main/java/org/apache/flink/agents/api/vectorstores/BaseVectorStore.java @@ -18,7 +18,7 @@ package org.apache.flink.agents.api.vectorstores; -import org.apache.flink.agents.api.embedding.model.BaseEmbeddingModelConnection; +import org.apache.flink.agents.api.embedding.model.BaseEmbeddingModelSetup; import org.apache.flink.agents.api.resource.Resource; import org.apache.flink.agents.api.resource.ResourceDescriptor; import org.apache.flink.agents.api.resource.ResourceType; @@ -66,8 +66,8 @@ public ResourceType getResourceType() { * @return VectorStoreQueryResult containing the retrieved documents */ public VectorStoreQueryResult query(VectorStoreQuery query) { - final BaseEmbeddingModelConnection embeddingModel = - (BaseEmbeddingModelConnection) + final BaseEmbeddingModelSetup embeddingModel = + (BaseEmbeddingModelSetup) this.getResource.apply(this.embeddingModel, ResourceType.EMBEDDING_MODEL); // TODO diff --git a/examples/src/main/java/org/apache/flink/agents/examples/rag/ElasticsearchRagExample.java b/examples/src/main/java/org/apache/flink/agents/examples/rag/ElasticsearchRagExample.java index 76cc3fd3..fb7e2033 100644 --- a/examples/src/main/java/org/apache/flink/agents/examples/rag/ElasticsearchRagExample.java +++ b/examples/src/main/java/org/apache/flink/agents/examples/rag/ElasticsearchRagExample.java @@ -36,6 +36,7 @@ import org.apache.flink.agents.integrations.chatmodels.ollama.OllamaChatModelConnection; import org.apache.flink.agents.integrations.chatmodels.ollama.OllamaChatModelSetup; import org.apache.flink.agents.integrations.embeddingmodels.ollama.OllamaEmbeddingModelConnection; +import org.apache.flink.agents.integrations.embeddingmodels.ollama.OllamaEmbeddingModelSetup; import org.apache.flink.agents.integrations.vectorstores.elasticsearch.ElasticsearchVectorStore; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -159,18 +160,26 @@ public static org.apache.flink.agents.api.prompt.Prompt contextEnhancedPrompt() return new org.apache.flink.agents.api.prompt.Prompt(template); } - @EmbeddingModelSetup - public static ResourceDescriptor textEmbedder() { - // Embedding setup referencing the embedding connection name + @EmbeddingModelConnection + public static ResourceDescriptor textEmbedderConnection() { return ResourceDescriptor.Builder.newBuilder( OllamaEmbeddingModelConnection.class.getName()) - .addInitialArgument("connection", "ollamaEmbeddingConnection") + .addInitialArgument( + "host", System.getProperty("OLLAMA_ENDPOINT", "http://localhost:11434")) .addInitialArgument( "model", System.getProperty("OLLAMA_EMBEDDING_MODEL", "nomic-embed-text")) .build(); } + @EmbeddingModelSetup + public static ResourceDescriptor textEmbedder() { + // Embedding setup referencing the embedding connection name + return ResourceDescriptor.Builder.newBuilder(OllamaEmbeddingModelSetup.class.getName()) + .addInitialArgument("connection", "textEmbedderConnection") + .build(); + } + @ChatModelConnection public static ResourceDescriptor ollamaChatModelConnection() { return ResourceDescriptor.Builder.newBuilder(OllamaChatModelConnection.class.getName()) From f6ed59f2418b5d489f291b21ed5523d19b38b8f7 Mon Sep 17 00:00:00 2001 From: twosom Date: Sun, 30 Nov 2025 15:07:26 +0900 Subject: [PATCH 11/13] refactor(plan): remove ContextRetrievalResponseEvent from action mapping --- .../flink/agents/plan/actions/ContextRetrievalAction.java | 4 +--- .../test/java/org/apache/flink/agents/plan/AgentPlanTest.java | 4 ++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/plan/src/main/java/org/apache/flink/agents/plan/actions/ContextRetrievalAction.java b/plan/src/main/java/org/apache/flink/agents/plan/actions/ContextRetrievalAction.java index b9544fde..0f84b345 100644 --- a/plan/src/main/java/org/apache/flink/agents/plan/actions/ContextRetrievalAction.java +++ b/plan/src/main/java/org/apache/flink/agents/plan/actions/ContextRetrievalAction.java @@ -40,9 +40,7 @@ public static Action getContextRetrievalAction() throws Exception { ContextRetrievalAction.class, "processContextRetrievalRequest", new Class[] {Event.class, RunnerContext.class}), - List.of( - ContextRetrievalRequestEvent.class.getName(), - ContextRetrievalResponseEvent.class.getName())); + List.of(ContextRetrievalRequestEvent.class.getName())); } @SuppressWarnings("unchecked") diff --git a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java index 45a30e21..07e3bc12 100644 --- a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java +++ b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java @@ -164,7 +164,7 @@ public void testConstructorWithAgent() throws Exception { assertThat(multiAction.getExec()).isInstanceOf(JavaFunction.class); // Verify actionsByEvent mapping - assertThat(agentPlan.getActionsByEvent().size()).isEqualTo(8); + assertThat(agentPlan.getActionsByEvent().size()).isEqualTo(7); // Check InputEvent mapping List inputEventActions = @@ -200,7 +200,7 @@ public void testConstructorWithAgentNoActions() throws Exception { // Verify that no actions were collected assertThat(agentPlan.getActions().size()).isEqualTo(3); - assertThat(agentPlan.getActionsByEvent().size()).isEqualTo(5); + assertThat(agentPlan.getActionsByEvent().size()).isEqualTo(4); } @Test From e60ccfafb8b75b871f4aa040b5fad4a40119f0fa Mon Sep 17 00:00:00 2001 From: twosom Date: Sun, 30 Nov 2025 15:09:07 +0900 Subject: [PATCH 12/13] refactor: remove unnecessary cast in ContextRetrievalAction --- .../flink/agents/plan/actions/ContextRetrievalAction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plan/src/main/java/org/apache/flink/agents/plan/actions/ContextRetrievalAction.java b/plan/src/main/java/org/apache/flink/agents/plan/actions/ContextRetrievalAction.java index 0f84b345..7a6e3e4f 100644 --- a/plan/src/main/java/org/apache/flink/agents/plan/actions/ContextRetrievalAction.java +++ b/plan/src/main/java/org/apache/flink/agents/plan/actions/ContextRetrievalAction.java @@ -59,7 +59,7 @@ public static void processContextRetrievalRequest(Event event, Runne final VectorStoreQuery vectorStoreQuery = new VectorStoreQuery( contextRetrievalRequestEvent.getQuery(), - ((ContextRetrievalRequestEvent) event).getMaxResults()); + contextRetrievalRequestEvent.getMaxResults()); final VectorStoreQueryResult result = vectorStore.query(vectorStoreQuery); From ff701bf5561079a68bb0cc655d498819532c327b Mon Sep 17 00:00:00 2001 From: twosom Date: Sun, 30 Nov 2025 15:34:16 +0900 Subject: [PATCH 13/13] test: update expected action counts in agent plan compatibility test --- .../tests/compatibility/create_python_agent_plan_from_json.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/flink_agents/plan/tests/compatibility/create_python_agent_plan_from_json.py b/python/flink_agents/plan/tests/compatibility/create_python_agent_plan_from_json.py index ff940d85..a04eaebb 100644 --- a/python/flink_agents/plan/tests/compatibility/create_python_agent_plan_from_json.py +++ b/python/flink_agents/plan/tests/compatibility/create_python_agent_plan_from_json.py @@ -32,7 +32,7 @@ agent_plan = AgentPlan.model_validate_json(java_plan_json) actions = agent_plan.actions - assert len(actions) == 4 + assert len(actions) == 5 event = "org.apache.flink.agents.api.Event" input_event = "org.apache.flink.agents.api.InputEvent" @@ -72,7 +72,7 @@ # check actions_by_event actions_by_event = agent_plan.actions_by_event - assert len(actions_by_event) == 5 + assert len(actions_by_event) == 6 assert input_event in actions_by_event assert sorted(actions_by_event[input_event]) == ["firstAction", "secondAction"]