diff --git a/api/src/main/java/org/apache/flink/agents/api/annotation/VectorStore.java b/api/src/main/java/org/apache/flink/agents/api/annotation/VectorStore.java
new file mode 100644
index 00000000..cc7b0928
--- /dev/null
+++ b/api/src/main/java/org/apache/flink/agents/api/annotation/VectorStore.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.annotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation to mark a field or method as a vector store resource that should be managed by the
+ * agent plan.
+ *
+ *
Fields annotated with @VectorStore will be scanned during agent plan creation and
+ * corresponding resource providers will be created to manage the vector store instances. Methods
+ * annotated with @VectorStore must be static and will be wrapped as vector store resources.
+ */
+@Target({ElementType.FIELD, ElementType.METHOD})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface VectorStore {}
diff --git a/api/src/main/java/org/apache/flink/agents/api/event/ContextRetrievalRequestEvent.java b/api/src/main/java/org/apache/flink/agents/api/event/ContextRetrievalRequestEvent.java
new file mode 100644
index 00000000..8f9ea9ca
--- /dev/null
+++ b/api/src/main/java/org/apache/flink/agents/api/event/ContextRetrievalRequestEvent.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.event;
+
+import org.apache.flink.agents.api.Event;
+
+/** Event representing a request for context retrieval. */
+public class ContextRetrievalRequestEvent extends Event {
+
+ private static final int DEFAULT_MAX_RESULTS = 3;
+
+ private final String query;
+ private final String vectorStore;
+ private final int maxResults;
+
+ public ContextRetrievalRequestEvent(String query, String vectorStore) {
+ this.query = query;
+ this.vectorStore = vectorStore;
+ this.maxResults = DEFAULT_MAX_RESULTS;
+ }
+
+ public ContextRetrievalRequestEvent(String query, String vectorStore, int maxResults) {
+ this.query = query;
+ this.vectorStore = vectorStore;
+ this.maxResults = maxResults;
+ }
+
+ public String getQuery() {
+ return query;
+ }
+
+ public String getVectorStore() {
+ return vectorStore;
+ }
+
+ public int getMaxResults() {
+ return maxResults;
+ }
+
+ @Override
+ public String toString() {
+ return "ContextRetrievalRequestEvent{"
+ + "query='"
+ + query
+ + '\''
+ + ", vectorStore='"
+ + vectorStore
+ + '\''
+ + ", maxResults="
+ + maxResults
+ + '}';
+ }
+}
diff --git a/api/src/main/java/org/apache/flink/agents/api/event/ContextRetrievalResponseEvent.java b/api/src/main/java/org/apache/flink/agents/api/event/ContextRetrievalResponseEvent.java
new file mode 100644
index 00000000..195c12e8
--- /dev/null
+++ b/api/src/main/java/org/apache/flink/agents/api/event/ContextRetrievalResponseEvent.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.event;
+
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.vectorstores.Document;
+
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Event representing retrieved context results.
+ *
+ * @param
+ */
+public class ContextRetrievalResponseEvent extends Event {
+
+ private final UUID requestId;
+ private final String query;
+ private final List> documents;
+
+ public ContextRetrievalResponseEvent(
+ UUID requestId, String query, List> documents) {
+ this.requestId = requestId;
+ this.query = query;
+ this.documents = documents;
+ }
+
+ public UUID getRequestId() {
+ return requestId;
+ }
+
+ public String getQuery() {
+ return query;
+ }
+
+ public List> getDocuments() {
+ return documents;
+ }
+
+ @Override
+ public String toString() {
+ return "ContextRetrievalResponseEvent{"
+ + "requestId="
+ + requestId
+ + ", query='"
+ + query
+ + '\''
+ + ", documents="
+ + documents
+ + '}';
+ }
+}
diff --git a/api/src/main/java/org/apache/flink/agents/api/vectorstores/BaseVectorStore.java b/api/src/main/java/org/apache/flink/agents/api/vectorstores/BaseVectorStore.java
new file mode 100644
index 00000000..5a0d65f3
--- /dev/null
+++ b/api/src/main/java/org/apache/flink/agents/api/vectorstores/BaseVectorStore.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.vectorstores;
+
+import org.apache.flink.agents.api.embedding.model.BaseEmbeddingModelSetup;
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+/**
+ * Base abstract class for vector store. Provides vector store functionality that integrates
+ * embedding models for text-based semantic search. Handles both connection management and embedding
+ * generation internally.
+ *
+ * @param The type of content stored in the vector store documents
+ */
+public abstract class BaseVectorStore extends Resource {
+
+ /** Name of the embedding model resource to use. */
+ protected final String embeddingModel;
+
+ public BaseVectorStore(
+ ResourceDescriptor descriptor, BiFunction getResource) {
+ super(descriptor, getResource);
+ this.embeddingModel = descriptor.getArgument("embedding_model");
+ }
+
+ @Override
+ public ResourceType getResourceType() {
+ return ResourceType.VECTOR_STORE;
+ }
+
+ /**
+ * Returns vector store setup settings passed to connection. These parameters are merged with
+ * query-specific parameters when performing vector search operations.
+ *
+ * @return A map containing the store configuration parameters
+ */
+ public abstract Map getStoreKwargs();
+
+ /**
+ * Performs vector search using structured query object. Converts text query to embeddings and
+ * returns structured query result.
+ *
+ * @param query VectorStoreQuery object containing query parameters
+ * @return VectorStoreQueryResult containing the retrieved documents
+ */
+ public VectorStoreQueryResult query(VectorStoreQuery query) {
+ final BaseEmbeddingModelSetup embeddingModel =
+ (BaseEmbeddingModelSetup)
+ this.getResource.apply(this.embeddingModel, ResourceType.EMBEDDING_MODEL);
+
+ // TODO
+ // for now, we don't need to use additional parameters.
+ final float[] queryEmbedding = embeddingModel.embed(query.getQueryText(), Map.of());
+
+ final Map storeKwargs = this.getStoreKwargs();
+ storeKwargs.putAll(query.getExtraArgs());
+
+ final List> documents =
+ this.queryEmbedding(queryEmbedding, query.getLimit(), storeKwargs);
+
+ return new VectorStoreQueryResult<>(documents);
+ }
+
+ /**
+ * Performs vector search using a pre-computed embedding.
+ *
+ * @param embedding The embedding vector to search with
+ * @param limit Maximum number of results to return
+ * @param args Additional arguments for the vector search
+ * @return List of documents matching the query embedding
+ */
+ public abstract List> queryEmbedding(
+ float[] embedding, int limit, Map args);
+}
diff --git a/api/src/main/java/org/apache/flink/agents/api/vectorstores/Document.java b/api/src/main/java/org/apache/flink/agents/api/vectorstores/Document.java
new file mode 100644
index 00000000..887651ee
--- /dev/null
+++ b/api/src/main/java/org/apache/flink/agents/api/vectorstores/Document.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.vectorstores;
+
+import java.util.Map;
+
+/**
+ * A document retrieved from vector store search.
+ *
+ * Represents a single piece of content with associated metadata. This class is generic to
+ * support different content types while maintaining consistent metadata and identification
+ * structure.
+ *
+ * @param the type of the document content
+ */
+public class Document {
+
+ /** Unique identifier of the document (if available). */
+ private final String id;
+
+ /** The actual content of the document. */
+ private final ContentT content;
+
+ /** Document metadata such as source, author, timestamp, etc. */
+ private final Map metadata;
+
+ public Document(ContentT content, Map metadata, String id) {
+ this.content = content;
+ this.metadata = metadata;
+ this.id = id;
+ }
+
+ public ContentT getContent() {
+ return content;
+ }
+
+ public Map getMetadata() {
+ return metadata;
+ }
+
+ public String getId() {
+ return id;
+ }
+}
diff --git a/api/src/main/java/org/apache/flink/agents/api/vectorstores/VectorStoreQuery.java b/api/src/main/java/org/apache/flink/agents/api/vectorstores/VectorStoreQuery.java
new file mode 100644
index 00000000..af31dc6f
--- /dev/null
+++ b/api/src/main/java/org/apache/flink/agents/api/vectorstores/VectorStoreQuery.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.vectorstores;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Parameters for querying a {@link BaseVectorStore}.
+ *
+ * A query consists of a textual prompt that will be embedded (for semantic search) or used as a
+ * keyword string, an optional {@code limit} for the number of results, a {@link
+ * VectorStoreQueryMode}, and an optional map of store-specific arguments.
+ */
+public class VectorStoreQuery {
+
+ /** How the query should be executed (semantic, keyword, or hybrid). */
+ private final VectorStoreQueryMode mode;
+ /** The user-provided query text. */
+ private final String queryText;
+ /** Maximum number of documents to return. */
+ private final Integer limit;
+ /** Additional store-specific parameters. */
+ private final Map extraArgs;
+
+ /**
+ * Creates a semantic-search query with default mode {@link VectorStoreQueryMode#SEMANTIC}.
+ *
+ * @param queryText the text to embed and search for
+ * @param limit maximum number of results to return
+ */
+ public VectorStoreQuery(String queryText, Integer limit) {
+ this.mode = VectorStoreQueryMode.SEMANTIC;
+ this.queryText = queryText;
+ this.limit = limit;
+ this.extraArgs = new HashMap<>();
+ }
+
+ /**
+ * Creates a query with explicit mode and extra arguments.
+ *
+ * @param mode the query mode
+ * @param queryText the text to search for
+ * @param limit maximum number of results to return
+ * @param extraArgs store-specific additional parameters
+ */
+ public VectorStoreQuery(
+ VectorStoreQueryMode mode,
+ String queryText,
+ Integer limit,
+ Map extraArgs) {
+ this.mode = mode;
+ this.queryText = queryText;
+ this.limit = limit;
+ this.extraArgs = extraArgs;
+ }
+
+ /** Returns the query mode. */
+ public VectorStoreQueryMode getMode() {
+ return mode;
+ }
+
+ /** Returns the query text. */
+ public String getQueryText() {
+ return queryText;
+ }
+
+ /** Returns the requested result limit. */
+ public Integer getLimit() {
+ return limit;
+ }
+
+ /** Returns extra store-specific arguments. */
+ public Map getExtraArgs() {
+ return extraArgs;
+ }
+}
diff --git a/api/src/main/java/org/apache/flink/agents/api/vectorstores/VectorStoreQueryMode.java b/api/src/main/java/org/apache/flink/agents/api/vectorstores/VectorStoreQueryMode.java
new file mode 100644
index 00000000..301ca759
--- /dev/null
+++ b/api/src/main/java/org/apache/flink/agents/api/vectorstores/VectorStoreQueryMode.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.vectorstores;
+
+/**
+ * Query execution modes for vector stores.
+ *
+ *
+ * - {@link #SEMANTIC}: Use dense vector embeddings and similarity search.
+ *
- {@link #KEYWORD}: Use keyword or lexical search when supported by the store.
+ *
- {@link #HYBRID}: Combine semantic and keyword search strategies.
+ *
+ */
+public enum VectorStoreQueryMode {
+ /** Semantic similarity search using embeddings. */
+ SEMANTIC,
+ /** Keyword/lexical search (store dependent). */
+ KEYWORD,
+ /** Hybrid search combining semantic and keyword results. */
+ HYBRID;
+}
diff --git a/api/src/main/java/org/apache/flink/agents/api/vectorstores/VectorStoreQueryResult.java b/api/src/main/java/org/apache/flink/agents/api/vectorstores/VectorStoreQueryResult.java
new file mode 100644
index 00000000..bb10e63d
--- /dev/null
+++ b/api/src/main/java/org/apache/flink/agents/api/vectorstores/VectorStoreQueryResult.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.vectorstores;
+
+import java.util.List;
+
+/**
+ * Result of a vector store query.
+ *
+ * Wraps the list of {@link Document} instances returned by a {@link BaseVectorStore} query in a
+ * simple value object.
+ *
+ * @param the type of the document content in the result
+ */
+public class VectorStoreQueryResult {
+ /** The documents matched by the query, ordered by relevance. */
+ private final List> documents;
+
+ /**
+ * Creates a new query result.
+ *
+ * @param documents the matched documents
+ */
+ public VectorStoreQueryResult(List> documents) {
+ this.documents = documents;
+ }
+
+ /** Returns the matched documents. */
+ public List> getDocuments() {
+ return documents;
+ }
+}
diff --git a/examples/pom.xml b/examples/pom.xml
index 1b8d543e..e59ff96f 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -69,6 +69,16 @@ under the License.
flink-agents-integrations-chat-models-ollama
${project.version}
+
+ org.apache.flink
+ flink-agents-integrations-embedding-models-ollama
+ ${project.version}
+
+
+ org.apache.flink
+ flink-agents-integrations-vector-stores-elasticsearch
+ ${project.version}
+
diff --git a/examples/src/main/java/org/apache/flink/agents/examples/rag/ElasticsearchKnowledgeBaseSetup.java b/examples/src/main/java/org/apache/flink/agents/examples/rag/ElasticsearchKnowledgeBaseSetup.java
new file mode 100644
index 00000000..3810add0
--- /dev/null
+++ b/examples/src/main/java/org/apache/flink/agents/examples/rag/ElasticsearchKnowledgeBaseSetup.java
@@ -0,0 +1,642 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.examples.rag;
+
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.elasticsearch.core.BulkRequest;
+import co.elastic.clients.elasticsearch.core.BulkResponse;
+import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
+import co.elastic.clients.elasticsearch.core.bulk.CreateOperation;
+import co.elastic.clients.elasticsearch.indices.CreateIndexRequest;
+import co.elastic.clients.elasticsearch.indices.CreateIndexResponse;
+import co.elastic.clients.elasticsearch.indices.ExistsRequest;
+import co.elastic.clients.json.jackson.JacksonJsonpMapper;
+import co.elastic.clients.transport.ElasticsearchTransport;
+import co.elastic.clients.transport.endpoints.BooleanResponse;
+import co.elastic.clients.transport.rest_client.RestClientTransport;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.integrations.embeddingmodels.ollama.OllamaEmbeddingModelConnection;
+import org.apache.http.Header;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.message.BasicHeader;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+
+/**
+ * Helper for preparing the Elasticsearch knowledge base used by the RAG example.
+ *
+ * Main responsibilities:
+ *
+ *
+ * - Create the target index with a simple mapping that includes a {@code dense_vector} field
+ * (if the index does not exist).
+ *
- Load a small set of built-in sample documents.
+ *
- Generate embeddings via an Ollama embedding model and bulk index documents along with their
+ * vectors.
+ *
+ *
+ * How it is used:
+ *
+ *
{@code
+ * ElasticsearchKnowledgeBaseSetup.populate(
+ * esHost,
+ * index,
+ * vectorField,
+ * dims,
+ * similarity, // e.g., "cosine"
+ * ollamaEndpoint, // e.g., "http://localhost:11434"
+ * embeddingModel // e.g., "nomic-embed-text"
+ * );
+ * }
+ *
+ * Authentication (System properties):
+ *
+ *
+ * - {@code ES_API_KEY_BASE64} — Base64 of {@code apiKeyId:apiKeySecret} (takes precedence)
+ *
- {@code ES_API_KEY_ID} and {@code ES_API_KEY_SECRET} — will be combined and Base64-encoded
+ *
- {@code ES_USERNAME} and {@code ES_PASSWORD} — basic authentication fallback
+ *
+ *
+ * If none of the above are provided, no authentication headers are added.
+ *
+ * Connection defaults:
+ *
+ *
+ * - Elasticsearch host defaults to {@code http://localhost:9200} when not specified.
+ *
- Ollama endpoint is passed in via the {@code populate(...)} arguments.
+ *
+ *
+ * Index mapping expectation:
+ *
+ *
+ * - This helper will create the index if missing with the following fields:
+ *
+ * - {@code content} — {@code text}
+ *
- {@code metadata} — {@code object} with {@code enabled=false}
+ *
- {@code } — {@code dense_vector} with {@code dims} and optional {@code
+ * similarity}
+ *
+ *
+ *
+ * Batching behavior (not externally configurable):
+ *
+ *
+ * - Embedding batch size: {@value #DEFAULT_EMBED_BATCH}
+ *
- Bulk indexing batch size: {@value #DEFAULT_BULK_SIZE}
+ *
+ *
+ * These defaults are internal and intentionally not exposed via system properties to keep the
+ * example simple and safe.
+ *
+ * Documents source:
+ *
+ *
+ * - By default, a small built-in set of topical documents is used to make the example runnable
+ * out of the box.
+ *
+ */
+public class ElasticsearchKnowledgeBaseSetup {
+
+ public static class SampleDoc {
+ public final String id;
+ public final String content;
+ public final Map metadata;
+
+ public SampleDoc(String id, String content, Map metadata) {
+ this.id = id;
+ this.content = content;
+ this.metadata = metadata;
+ }
+ }
+
+ public static void populate(
+ String esHost,
+ String index,
+ String vectorField,
+ int dims,
+ String similarity,
+ String ollamaEndpoint,
+ String embeddingModel)
+ throws IOException {
+
+ ElasticsearchClient client = buildEsClient(esHost);
+ ensureIndex(client, index, vectorField, dims, similarity);
+
+ // Prepare embedding connection
+ OllamaEmbeddingModelConnection embeddingConn =
+ buildEmbeddingConnection(ollamaEndpoint, embeddingModel);
+
+ // Load documents using a simplified loader selection
+ List docs = loadDocuments();
+
+ if (docs.isEmpty()) {
+ System.out.println("[KB Setup] No documents to index.");
+ return;
+ }
+
+ int indexed = embedAndIndexAll(client, index, vectorField, embeddingConn, docs);
+
+ System.out.printf("[KB Setup] Indexed %d sample documents into '%s'%n", indexed, index);
+ }
+
+ /** Builds an embedding connection for Ollama. */
+ private static OllamaEmbeddingModelConnection buildEmbeddingConnection(
+ String ollamaEndpoint, String embeddingModel) {
+ ResourceDescriptor embeddingConnDesc =
+ ResourceDescriptor.Builder.newBuilder(
+ OllamaEmbeddingModelConnection.class.getName())
+ .addInitialArgument("host", ollamaEndpoint)
+ .addInitialArgument("model", embeddingModel)
+ .build();
+ return new OllamaEmbeddingModelConnection(embeddingConnDesc, (name, type) -> null);
+ }
+
+ /** Default internal batch sizes (not configurable via system properties). */
+ private static final int DEFAULT_EMBED_BATCH = 32;
+
+ private static final int DEFAULT_BULK_SIZE = 100;
+
+ /** Embeds all documents in batches and performs bulk indexing. */
+ private static int embedAndIndexAll(
+ ElasticsearchClient client,
+ String index,
+ String vectorField,
+ OllamaEmbeddingModelConnection embeddingConn,
+ List docs)
+ throws IOException {
+ int indexed = 0;
+ for (int start = 0; start < docs.size(); start += DEFAULT_EMBED_BATCH) {
+ int end = Math.min(start + DEFAULT_EMBED_BATCH, docs.size());
+ List batch = docs.subList(start, end);
+ List texts = new ArrayList<>(batch.size());
+ for (SampleDoc d : batch) {
+ texts.add(d.content);
+ }
+
+ List embeddings = embeddingConn.embed(texts, Map.of());
+ indexed += bulkIndexBatch(client, index, vectorField, batch, embeddings);
+ }
+ return indexed;
+ }
+
+ /** Converts float[] to List as expected by ES Java API. */
+ private static List toFloatList(float[] vec) {
+ List list = new ArrayList<>(vec.length);
+ for (float v : vec) list.add(v);
+ return list;
+ }
+
+ /** Prepares and executes bulk index requests for a batch. */
+ private static int bulkIndexBatch(
+ ElasticsearchClient client,
+ String index,
+ String vectorField,
+ List batch,
+ List embeddings)
+ throws IOException {
+ int indexed = 0;
+ List ops = new ArrayList<>();
+ for (int i = 0; i < batch.size(); i++) {
+ SampleDoc d = batch.get(i);
+ float[] vec = embeddings.get(i);
+ Map sourceDoc = new HashMap<>();
+ sourceDoc.put("content", d.content);
+ sourceDoc.put("metadata", d.metadata);
+ sourceDoc.put(vectorField, toFloatList(vec));
+
+ CreateOperation