From 796645ba8ed52dbd0c17ba74de3be0255c498247 Mon Sep 17 00:00:00 2001 From: HJK181 Date: Thu, 6 Aug 2020 15:05:38 +0200 Subject: [PATCH] Add support for ingest pipelines --- README.md | 28 ++++ .../tools/ElasticsearchBeyonder.java | 8 ++ .../elasticsearch/tools/SettingsFinder.java | 1 + .../tools/index/IndexFinder.java | 2 +- .../PipelineElasticsearchUpdater.java | 121 ++++++++++++++++++ .../tools/pipeline/PipelineFinder.java | 64 +++++++++ .../pipeline/PipelineSettingsReader.java | 58 +++++++++ .../elasticsearch/tools/BeyonderRestIT.java | 8 ++ .../pipeline/_pipeline/twitter_pipeline.json | 11 ++ 9 files changed, 300 insertions(+), 1 deletion(-) create mode 100644 src/main/java/fr/pilato/elasticsearch/tools/pipeline/PipelineElasticsearchUpdater.java create mode 100644 src/main/java/fr/pilato/elasticsearch/tools/pipeline/PipelineFinder.java create mode 100644 src/main/java/fr/pilato/elasticsearch/tools/pipeline/PipelineSettingsReader.java create mode 100644 src/test/resources/models/pipeline/_pipeline/twitter_pipeline.json diff --git a/README.md b/README.md index 9328682..b633e02 100644 --- a/README.md +++ b/README.md @@ -204,6 +204,34 @@ By default, Beyonder will not overwrite a template if it already exists. This can be overridden by setting `force` to `true` in the expanded factory method `ElasticsearchBeyonder.start()`. +Managing pipelines +------------------ + +A pipeline is a definition of a series of processors that are to be executed in the same order as they are declared while +documents are being indexed. Please note that this feature is only supported when you use the REST client not the Transport client. + +For example, setting one fields value based on another field by using an Set Processor you an add a file named `elasticsearch/_pipeline/set_field_processor` +in your project: + +```javascript +{ + "description" : "Twitter pipeline", + "processors" : [ + { + "set" : { + "field": "copy", + "value": "{{otherField}}" + } + } + ] +} +``` + +By default, Beyonder will not overwrite a pipeline if it already exists. +This can be overridden by setting `force` to `true` in the expanded factory method +`ElasticsearchBeyonder.start()`. + + Why this name? ============== diff --git a/src/main/java/fr/pilato/elasticsearch/tools/ElasticsearchBeyonder.java b/src/main/java/fr/pilato/elasticsearch/tools/ElasticsearchBeyonder.java index 37a9384..ea15e92 100644 --- a/src/main/java/fr/pilato/elasticsearch/tools/ElasticsearchBeyonder.java +++ b/src/main/java/fr/pilato/elasticsearch/tools/ElasticsearchBeyonder.java @@ -22,6 +22,7 @@ import fr.pilato.elasticsearch.tools.SettingsFinder.Defaults; import fr.pilato.elasticsearch.tools.index.IndexFinder; import fr.pilato.elasticsearch.tools.template.TemplateFinder; +import fr.pilato.elasticsearch.tools.pipeline.PipelineFinder; import org.elasticsearch.client.Client; import org.elasticsearch.client.RestClient; import org.slf4j.Logger; @@ -33,6 +34,7 @@ import static fr.pilato.elasticsearch.tools.index.IndexElasticsearchUpdater.createIndex; import static fr.pilato.elasticsearch.tools.index.IndexElasticsearchUpdater.updateSettings; import static fr.pilato.elasticsearch.tools.template.TemplateElasticsearchUpdater.createTemplate; +import static fr.pilato.elasticsearch.tools.pipeline.PipelineElasticsearchUpdater.createPipeline; /** * By default, indexes are created with their default Elasticsearch settings. You can specify @@ -109,6 +111,12 @@ public static void start(RestClient client, String root, boolean merge, boolean createTemplate(client, root, templateName, force); } + // create pipelines + List pipelineNames = PipelineFinder.findPipelines(root); + for (String pipelineName : pipelineNames) { + createPipeline(client, root, pipelineName, force); + } + // create indices Collection indexNames = IndexFinder.findIndexNames(root); for (String indexName : indexNames) { diff --git a/src/main/java/fr/pilato/elasticsearch/tools/SettingsFinder.java b/src/main/java/fr/pilato/elasticsearch/tools/SettingsFinder.java index c9921d1..e43083b 100644 --- a/src/main/java/fr/pilato/elasticsearch/tools/SettingsFinder.java +++ b/src/main/java/fr/pilato/elasticsearch/tools/SettingsFinder.java @@ -46,6 +46,7 @@ public static class Defaults { public static String IndexSettingsFileName = "_settings.json"; public static String UpdateIndexSettingsFileName = "_update_settings.json"; public static String TemplateDir = "_template"; + public static String PipelineDir = "_pipeline"; /** * Default setting of whether or not to merge mappings on start. diff --git a/src/main/java/fr/pilato/elasticsearch/tools/index/IndexFinder.java b/src/main/java/fr/pilato/elasticsearch/tools/index/IndexFinder.java index be5ae60..0abc3d9 100644 --- a/src/main/java/fr/pilato/elasticsearch/tools/index/IndexFinder.java +++ b/src/main/java/fr/pilato/elasticsearch/tools/index/IndexFinder.java @@ -70,7 +70,7 @@ public static List findIndexNames(final String root) throws IOException, } else { key = resource; } - if (!key.equals(Defaults.TemplateDir) && !keys.contains(key)) { + if (!key.equals(Defaults.TemplateDir) && !key.equals(Defaults.PipelineDir) && !keys.contains(key)) { logger.trace(" - found [{}].", key); keys.add(key); indexNames.add(key); diff --git a/src/main/java/fr/pilato/elasticsearch/tools/pipeline/PipelineElasticsearchUpdater.java b/src/main/java/fr/pilato/elasticsearch/tools/pipeline/PipelineElasticsearchUpdater.java new file mode 100644 index 0000000..19d5076 --- /dev/null +++ b/src/main/java/fr/pilato/elasticsearch/tools/pipeline/PipelineElasticsearchUpdater.java @@ -0,0 +1,121 @@ +package fr.pilato.elasticsearch.tools.pipeline; + +import java.io.IOException; + +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.client.RestClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Handles ingest pipeline creation. + * + * @author hjk181 + * + */ +public class PipelineElasticsearchUpdater { + + private static final Logger logger = LoggerFactory.getLogger(PipelineElasticsearchUpdater.class); + + /** + * Create a pipeline in Elasticsearch. + * + * @param client Elasticsearch client + * @param root dir within the classpath + * @param pipeline the id of the pipeline + * @param force set it to true if you want to force cleaning pipeline before adding it + * @throws Exception if something goes wrong + */ + public static void createPipeline(RestClient client, String root, String pipeline, boolean force) throws Exception { + String json = PipelineSettingsReader.readPipeline(root, pipeline); + createPipelineWithJson(client, pipeline, json, force); + } + + /** + * Create a pipeline in Elasticsearch. Read content from default classpath dir. + * + * @param client Elasticsearch client + * @param pipeline the id of the pipeline + * @param force set it to true if you want to force cleaning pipeline before adding it + * @throws Exception if something goes wrong + */ + public static void createPipeline(RestClient client, String pipeline, boolean force) throws Exception { + String json = PipelineSettingsReader.readPipeline(pipeline); + createPipelineWithJson(client, pipeline, json, force); + } + + /** + * Create a new pipeline in Elasticsearch + * + * @param client Elasticsearch client + * @param pipeline the id of the pipeline + * @param json JSon content for the pipeline + * @param force set it to true if you want to force cleaning pipeline before adding it + * @throws Exception if something goes wrong + */ + public static void createPipelineWithJson(RestClient client, String pipeline, String json, boolean force) throws Exception { + if (isPipelineExist(client, pipeline)) { + if (force) { + logger.debug("Pipeline [{}] already exists. Force is set. Overriding it.", pipeline); + createPipelineWithJsonInElasticsearch(client, pipeline, json); + } + else { + logger.debug("Pipeline [{}] already exists.", pipeline); + } + } + + if (!isPipelineExist(client, pipeline)) { + logger.debug("Pipeline [{}] doesn't exist. Creating it.", pipeline); + createPipelineWithJsonInElasticsearch(client, pipeline, json); + } + } + + /** + * Create a new pipeline in Elasticsearch. + * + * @param client Elasticsearch client + * @param pipeline the id of the pipeline + * @param json JSon content for the pipeline + * @throws Exception if something goes wrong + */ + private static void createPipelineWithJsonInElasticsearch(RestClient client, String pipeline, String json) throws Exception { + logger.trace("createPipeline([{}])", pipeline); + + assert client != null; + assert pipeline != null; + + Request request = new Request("PUT", "/_ingest/pipeline/" + pipeline); + request.setJsonEntity(json); + Response response = client.performRequest(request); + + if (response.getStatusLine().getStatusCode() != 200) { + logger.warn("Could not create pipeline [{}]", pipeline); + throw new Exception("Could not create pipeline [" + pipeline + "]."); + } + + logger.trace("/createPipeline([{}])", pipeline); + } + + /** + * Check if a pipeline exists + * + * @param client Elasticsearch client + * @param pipeline the id of the pipeline + * @return true if the pipeline exists + * @throws IOException if something goes wrong + */ + public static boolean isPipelineExist(RestClient client, String pipeline) throws IOException { + try { + Response response = client.performRequest(new Request("GET", "/_ingest/pipeline/" + pipeline)); + return response.getEntity() != null; + } + catch (ResponseException e) { + if (404 != e.getResponse().getStatusLine().getStatusCode()) { + throw e; + } + } + return false; + } +} diff --git a/src/main/java/fr/pilato/elasticsearch/tools/pipeline/PipelineFinder.java b/src/main/java/fr/pilato/elasticsearch/tools/pipeline/PipelineFinder.java new file mode 100644 index 0000000..fe850d7 --- /dev/null +++ b/src/main/java/fr/pilato/elasticsearch/tools/pipeline/PipelineFinder.java @@ -0,0 +1,64 @@ +package fr.pilato.elasticsearch.tools.pipeline; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import fr.pilato.elasticsearch.tools.ResourceList; +import fr.pilato.elasticsearch.tools.SettingsFinder; +import fr.pilato.elasticsearch.tools.template.TemplateFinder; + +/** + * Findes ingest pipelines on the classpath. + * + * @author hjk181 + * + */ +public class PipelineFinder extends SettingsFinder { + + private static final Logger logger = LoggerFactory.getLogger(TemplateFinder.class); + + /** + * Find all pipelines in default classpath dir + * + * @return a list of pipelines + * @throws IOException if connection with elasticsearch is failing + * @throws URISyntaxException this should not happen + */ + public static List findPipelines() throws IOException, URISyntaxException { + return findPipelines(Defaults.ConfigDir); + } + + /** + * Find all pipelines + * + * @param root dir within the classpath + * @return a list of pipelines + * @throws IOException if connection with elasticsearch is failing + * @throws URISyntaxException this should not happen + */ + public static List findPipelines(String root) throws IOException, URISyntaxException { + if (root == null) { + return findPipelines(); + } + + logger.debug("Looking for pipelines in classpath under [{}].", root); + + final List pipelineNames = new ArrayList<>(); + String[] resources = ResourceList.getResources(root + "/" + Defaults.PipelineDir + "/"); // "es/_pipeline/" + for (String resource : resources) { + if (!resource.isEmpty()) { + String withoutIndex = resource.substring(resource.indexOf("/") + 1); + String pipeline = withoutIndex.substring(0, withoutIndex.indexOf(Defaults.JsonFileExtension)); + logger.trace(" - found [{}].", pipeline); + pipelineNames.add(pipeline); + } + } + + return pipelineNames; + } +} \ No newline at end of file diff --git a/src/main/java/fr/pilato/elasticsearch/tools/pipeline/PipelineSettingsReader.java b/src/main/java/fr/pilato/elasticsearch/tools/pipeline/PipelineSettingsReader.java new file mode 100644 index 0000000..28db404 --- /dev/null +++ b/src/main/java/fr/pilato/elasticsearch/tools/pipeline/PipelineSettingsReader.java @@ -0,0 +1,58 @@ +/* + * Licensed to David Pilato (the "Author") under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Author licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package fr.pilato.elasticsearch.tools.pipeline; + +import java.io.IOException; + +import fr.pilato.elasticsearch.tools.SettingsFinder.Defaults; +import fr.pilato.elasticsearch.tools.SettingsReader; + +/** + * Manage elasticsearch pipeline files. + * + * @author hjk181 + */ +public class PipelineSettingsReader extends SettingsReader { + + /** + * Read a pipeline + * @param root dir within the classpath + * @param pipeline the id of the pipeline (.json will be appended) + * @return The pipeline content + * @throws IOException if the connection with elasticsearch is failing + */ + public static String readPipeline(String root, String pipeline) throws IOException { + if (root == null) { + return readPipeline(pipeline); + } + String settingsFile = root + "/" + Defaults.PipelineDir + "/" + pipeline + Defaults.JsonFileExtension; + return readFileFromClasspath(settingsFile); + } + + /** + * Read a pipeline in default classpath dir + * @param pipeline the id of the pipeline (.json will be appended) + * @return The pipeline content + * @throws IOException if the connection with elasticsearch is failing + */ + public static String readPipeline(String pipeline) throws IOException { + return readPipeline(Defaults.ConfigDir, pipeline); + } +} \ No newline at end of file diff --git a/src/test/java/fr/pilato/elasticsearch/tools/BeyonderRestIT.java b/src/test/java/fr/pilato/elasticsearch/tools/BeyonderRestIT.java index 6532847..e74b5d3 100644 --- a/src/test/java/fr/pilato/elasticsearch/tools/BeyonderRestIT.java +++ b/src/test/java/fr/pilato/elasticsearch/tools/BeyonderRestIT.java @@ -21,6 +21,8 @@ import fr.pilato.elasticsearch.tools.alias.AliasElasticsearchUpdater; import fr.pilato.elasticsearch.tools.index.IndexElasticsearchUpdater; +import fr.pilato.elasticsearch.tools.pipeline.PipelineElasticsearchUpdater; + import org.apache.commons.io.IOUtils; import org.apache.http.HttpEntity; import org.elasticsearch.client.Request; @@ -142,6 +144,12 @@ public void testForceEnabled() throws Exception { String newMapping = getMapping("twitter"); assertThat(newMapping, is(not(oldMapping))); } + + @Test + public void testPipeline() throws Exception { + ElasticsearchBeyonder.start(client, "models/pipeline"); + assertThat(PipelineElasticsearchUpdater.isPipelineExist(client, "twitter_pipeline"), is(true)); + } private String getMapping(String indexName) throws IOException { HttpEntity response = client.performRequest(new Request("GET", indexName + "/_mapping")).getEntity(); diff --git a/src/test/resources/models/pipeline/_pipeline/twitter_pipeline.json b/src/test/resources/models/pipeline/_pipeline/twitter_pipeline.json new file mode 100644 index 0000000..60b74ea --- /dev/null +++ b/src/test/resources/models/pipeline/_pipeline/twitter_pipeline.json @@ -0,0 +1,11 @@ +{ + "description" : "Twitter pipeline", + "processors" : [ + { + "set" : { + "field": "foo", + "value": "bar" + } + } + ] +} \ No newline at end of file