From 66ee19333438b2c8108cb27b6490292207cd4701 Mon Sep 17 00:00:00 2001 From: "dependabot-preview[bot]" <27856297+dependabot-preview[bot]@users.noreply.github.com> Date: Thu, 18 Jun 2020 04:18:11 +0000 Subject: [PATCH 1/9] Bump maven-failsafe-plugin from 3.0.0-M4 to 3.0.0-M5 Bumps [maven-failsafe-plugin](https://github.com/apache/maven-surefire) from 3.0.0-M4 to 3.0.0-M5. - [Release notes](https://github.com/apache/maven-surefire/releases) - [Commits](https://github.com/apache/maven-surefire/compare/surefire-3.0.0-M4...surefire-3.0.0-M5) Signed-off-by: dependabot-preview[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index dd31ef4..8be83cb 100644 --- a/pom.xml +++ b/pom.xml @@ -266,7 +266,7 @@ when an elasticsearch client starts. org.apache.maven.plugins maven-failsafe-plugin - 3.0.0-M4 + 3.0.0-M5 ${skipUnitTests} From b215f623b7cbeb6a0d5b2ecaa528cf40e11f2f8a Mon Sep 17 00:00:00 2001 From: "dependabot-preview[bot]" <27856297+dependabot-preview[bot]@users.noreply.github.com> Date: Fri, 19 Jun 2020 04:17:38 +0000 Subject: [PATCH 3/9] Bump elasticsearch.version from 7.7.1 to 7.8.0 Bumps `elasticsearch.version` from 7.7.1 to 7.8.0. Updates `transport` from 7.7.1 to 7.8.0 - [Release notes](https://github.com/elastic/elasticsearch/releases) - [Commits](https://github.com/elastic/elasticsearch/compare/v7.7.1...v7.8.0) Updates `elasticsearch-rest-client` from 7.7.1 to 7.8.0 - [Release notes](https://github.com/elastic/elasticsearch/releases) - [Commits](https://github.com/elastic/elasticsearch/compare/v7.7.1...v7.8.0) Signed-off-by: dependabot-preview[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index f1cf1cb..b950654 100644 --- a/pom.xml +++ b/pom.xml @@ -22,7 +22,7 @@ when an elasticsearch client starts. - 7.7.1 + 7.8.0 ${elasticsearch.version} 1.7.30 2.13.3 From 531789e1c6a14b20574c4e48e03604bfa90180eb Mon Sep 17 00:00:00 2001 From: "dependabot-preview[bot]" <27856297+dependabot-preview[bot]@users.noreply.github.com> Date: Thu, 25 Jun 2020 04:20:37 +0000 Subject: [PATCH 4/9] Bump jackson.version from 2.11.0 to 2.11.1 Bumps `jackson.version` from 2.11.0 to 2.11.1. Updates `jackson-core` from 2.11.0 to 2.11.1 - [Release notes](https://github.com/FasterXML/jackson-core/releases) - [Commits](https://github.com/FasterXML/jackson-core/compare/jackson-core-2.11.0...jackson-core-2.11.1) Updates `jackson-databind` from 2.11.0 to 2.11.1 - [Release notes](https://github.com/FasterXML/jackson/releases) - [Commits](https://github.com/FasterXML/jackson/commits) Signed-off-by: dependabot-preview[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index b950654..742bc74 100644 --- a/pom.xml +++ b/pom.xml @@ -27,7 +27,7 @@ when an elasticsearch client starts. 1.7.30 2.13.3 2.7 - 2.11.0 + 2.11.1 docker.elastic.co/elasticsearch/elasticsearch From 70ee48ed2de0991c68b66e3ece261ca1e82a6692 Mon Sep 17 00:00:00 2001 From: "dependabot-preview[bot]" <27856297+dependabot-preview[bot]@users.noreply.github.com> Date: Mon, 27 Jul 2020 04:17:41 +0000 Subject: [PATCH 5/9] Bump commons-text from 1.8 to 1.9 Bumps commons-text from 1.8 to 1.9. Signed-off-by: dependabot-preview[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 742bc74..06d037e 100644 --- a/pom.xml +++ b/pom.xml @@ -165,7 +165,7 @@ when an elasticsearch client starts. org.apache.commons commons-text - 1.8 + 1.9 From 504e0d130083084475c70a2286bf14bd40e0d063 Mon Sep 17 00:00:00 2001 From: "dependabot-preview[bot]" <27856297+dependabot-preview[bot]@users.noreply.github.com> Date: Tue, 28 Jul 2020 04:18:40 +0000 Subject: [PATCH 6/9] Bump elasticsearch.version from 7.8.0 to 7.8.1 Bumps `elasticsearch.version` from 7.8.0 to 7.8.1. Updates `transport` from 7.8.0 to 7.8.1 - [Release notes](https://github.com/elastic/elasticsearch/releases) - [Commits](https://github.com/elastic/elasticsearch/compare/v7.8.0...v7.8.1) Updates `elasticsearch-rest-client` from 7.8.0 to 7.8.1 - [Release notes](https://github.com/elastic/elasticsearch/releases) - [Commits](https://github.com/elastic/elasticsearch/compare/v7.8.0...v7.8.1) Signed-off-by: dependabot-preview[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 06d037e..3251c66 100644 --- a/pom.xml +++ b/pom.xml @@ -22,7 +22,7 @@ when an elasticsearch client starts. - 7.8.0 + 7.8.1 ${elasticsearch.version} 1.7.30 2.13.3 From 5e389f94aa4261c54190de553896d6f25c7d9187 Mon Sep 17 00:00:00 2001 From: "dependabot-preview[bot]" <27856297+dependabot-preview[bot]@users.noreply.github.com> Date: Mon, 3 Aug 2020 04:21:14 +0000 Subject: [PATCH 7/9] Bump jackson.version from 2.11.1 to 2.11.2 Bumps `jackson.version` from 2.11.1 to 2.11.2. Updates `jackson-core` from 2.11.1 to 2.11.2 - [Release notes](https://github.com/FasterXML/jackson-core/releases) - [Commits](https://github.com/FasterXML/jackson-core/compare/jackson-core-2.11.1...jackson-core-2.11.2) Updates `jackson-databind` from 2.11.1 to 2.11.2 - [Release notes](https://github.com/FasterXML/jackson/releases) - [Commits](https://github.com/FasterXML/jackson/commits) Signed-off-by: dependabot-preview[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 3251c66..1f805f9 100644 --- a/pom.xml +++ b/pom.xml @@ -27,7 +27,7 @@ when an elasticsearch client starts. 1.7.30 2.13.3 2.7 - 2.11.1 + 2.11.2 docker.elastic.co/elasticsearch/elasticsearch From 796645ba8ed52dbd0c17ba74de3be0255c498247 Mon Sep 17 00:00:00 2001 From: HJK181 Date: Thu, 6 Aug 2020 15:05:38 +0200 Subject: [PATCH 8/9] Add support for ingest pipelines --- README.md | 28 ++++ .../tools/ElasticsearchBeyonder.java | 8 ++ .../elasticsearch/tools/SettingsFinder.java | 1 + .../tools/index/IndexFinder.java | 2 +- .../PipelineElasticsearchUpdater.java | 121 ++++++++++++++++++ .../tools/pipeline/PipelineFinder.java | 64 +++++++++ .../pipeline/PipelineSettingsReader.java | 58 +++++++++ .../elasticsearch/tools/BeyonderRestIT.java | 8 ++ .../pipeline/_pipeline/twitter_pipeline.json | 11 ++ 9 files changed, 300 insertions(+), 1 deletion(-) create mode 100644 src/main/java/fr/pilato/elasticsearch/tools/pipeline/PipelineElasticsearchUpdater.java create mode 100644 src/main/java/fr/pilato/elasticsearch/tools/pipeline/PipelineFinder.java create mode 100644 src/main/java/fr/pilato/elasticsearch/tools/pipeline/PipelineSettingsReader.java create mode 100644 src/test/resources/models/pipeline/_pipeline/twitter_pipeline.json diff --git a/README.md b/README.md index 9328682..b633e02 100644 --- a/README.md +++ b/README.md @@ -204,6 +204,34 @@ By default, Beyonder will not overwrite a template if it already exists. This can be overridden by setting `force` to `true` in the expanded factory method `ElasticsearchBeyonder.start()`. +Managing pipelines +------------------ + +A pipeline is a definition of a series of processors that are to be executed in the same order as they are declared while +documents are being indexed. Please note that this feature is only supported when you use the REST client not the Transport client. + +For example, setting one fields value based on another field by using an Set Processor you an add a file named `elasticsearch/_pipeline/set_field_processor` +in your project: + +```javascript +{ + "description" : "Twitter pipeline", + "processors" : [ + { + "set" : { + "field": "copy", + "value": "{{otherField}}" + } + } + ] +} +``` + +By default, Beyonder will not overwrite a pipeline if it already exists. +This can be overridden by setting `force` to `true` in the expanded factory method +`ElasticsearchBeyonder.start()`. + + Why this name? ============== diff --git a/src/main/java/fr/pilato/elasticsearch/tools/ElasticsearchBeyonder.java b/src/main/java/fr/pilato/elasticsearch/tools/ElasticsearchBeyonder.java index 37a9384..ea15e92 100644 --- a/src/main/java/fr/pilato/elasticsearch/tools/ElasticsearchBeyonder.java +++ b/src/main/java/fr/pilato/elasticsearch/tools/ElasticsearchBeyonder.java @@ -22,6 +22,7 @@ import fr.pilato.elasticsearch.tools.SettingsFinder.Defaults; import fr.pilato.elasticsearch.tools.index.IndexFinder; import fr.pilato.elasticsearch.tools.template.TemplateFinder; +import fr.pilato.elasticsearch.tools.pipeline.PipelineFinder; import org.elasticsearch.client.Client; import org.elasticsearch.client.RestClient; import org.slf4j.Logger; @@ -33,6 +34,7 @@ import static fr.pilato.elasticsearch.tools.index.IndexElasticsearchUpdater.createIndex; import static fr.pilato.elasticsearch.tools.index.IndexElasticsearchUpdater.updateSettings; import static fr.pilato.elasticsearch.tools.template.TemplateElasticsearchUpdater.createTemplate; +import static fr.pilato.elasticsearch.tools.pipeline.PipelineElasticsearchUpdater.createPipeline; /** * By default, indexes are created with their default Elasticsearch settings. You can specify @@ -109,6 +111,12 @@ public static void start(RestClient client, String root, boolean merge, boolean createTemplate(client, root, templateName, force); } + // create pipelines + List pipelineNames = PipelineFinder.findPipelines(root); + for (String pipelineName : pipelineNames) { + createPipeline(client, root, pipelineName, force); + } + // create indices Collection indexNames = IndexFinder.findIndexNames(root); for (String indexName : indexNames) { diff --git a/src/main/java/fr/pilato/elasticsearch/tools/SettingsFinder.java b/src/main/java/fr/pilato/elasticsearch/tools/SettingsFinder.java index c9921d1..e43083b 100644 --- a/src/main/java/fr/pilato/elasticsearch/tools/SettingsFinder.java +++ b/src/main/java/fr/pilato/elasticsearch/tools/SettingsFinder.java @@ -46,6 +46,7 @@ public static class Defaults { public static String IndexSettingsFileName = "_settings.json"; public static String UpdateIndexSettingsFileName = "_update_settings.json"; public static String TemplateDir = "_template"; + public static String PipelineDir = "_pipeline"; /** * Default setting of whether or not to merge mappings on start. diff --git a/src/main/java/fr/pilato/elasticsearch/tools/index/IndexFinder.java b/src/main/java/fr/pilato/elasticsearch/tools/index/IndexFinder.java index be5ae60..0abc3d9 100644 --- a/src/main/java/fr/pilato/elasticsearch/tools/index/IndexFinder.java +++ b/src/main/java/fr/pilato/elasticsearch/tools/index/IndexFinder.java @@ -70,7 +70,7 @@ public static List findIndexNames(final String root) throws IOException, } else { key = resource; } - if (!key.equals(Defaults.TemplateDir) && !keys.contains(key)) { + if (!key.equals(Defaults.TemplateDir) && !key.equals(Defaults.PipelineDir) && !keys.contains(key)) { logger.trace(" - found [{}].", key); keys.add(key); indexNames.add(key); diff --git a/src/main/java/fr/pilato/elasticsearch/tools/pipeline/PipelineElasticsearchUpdater.java b/src/main/java/fr/pilato/elasticsearch/tools/pipeline/PipelineElasticsearchUpdater.java new file mode 100644 index 0000000..19d5076 --- /dev/null +++ b/src/main/java/fr/pilato/elasticsearch/tools/pipeline/PipelineElasticsearchUpdater.java @@ -0,0 +1,121 @@ +package fr.pilato.elasticsearch.tools.pipeline; + +import java.io.IOException; + +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.client.RestClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Handles ingest pipeline creation. + * + * @author hjk181 + * + */ +public class PipelineElasticsearchUpdater { + + private static final Logger logger = LoggerFactory.getLogger(PipelineElasticsearchUpdater.class); + + /** + * Create a pipeline in Elasticsearch. + * + * @param client Elasticsearch client + * @param root dir within the classpath + * @param pipeline the id of the pipeline + * @param force set it to true if you want to force cleaning pipeline before adding it + * @throws Exception if something goes wrong + */ + public static void createPipeline(RestClient client, String root, String pipeline, boolean force) throws Exception { + String json = PipelineSettingsReader.readPipeline(root, pipeline); + createPipelineWithJson(client, pipeline, json, force); + } + + /** + * Create a pipeline in Elasticsearch. Read content from default classpath dir. + * + * @param client Elasticsearch client + * @param pipeline the id of the pipeline + * @param force set it to true if you want to force cleaning pipeline before adding it + * @throws Exception if something goes wrong + */ + public static void createPipeline(RestClient client, String pipeline, boolean force) throws Exception { + String json = PipelineSettingsReader.readPipeline(pipeline); + createPipelineWithJson(client, pipeline, json, force); + } + + /** + * Create a new pipeline in Elasticsearch + * + * @param client Elasticsearch client + * @param pipeline the id of the pipeline + * @param json JSon content for the pipeline + * @param force set it to true if you want to force cleaning pipeline before adding it + * @throws Exception if something goes wrong + */ + public static void createPipelineWithJson(RestClient client, String pipeline, String json, boolean force) throws Exception { + if (isPipelineExist(client, pipeline)) { + if (force) { + logger.debug("Pipeline [{}] already exists. Force is set. Overriding it.", pipeline); + createPipelineWithJsonInElasticsearch(client, pipeline, json); + } + else { + logger.debug("Pipeline [{}] already exists.", pipeline); + } + } + + if (!isPipelineExist(client, pipeline)) { + logger.debug("Pipeline [{}] doesn't exist. Creating it.", pipeline); + createPipelineWithJsonInElasticsearch(client, pipeline, json); + } + } + + /** + * Create a new pipeline in Elasticsearch. + * + * @param client Elasticsearch client + * @param pipeline the id of the pipeline + * @param json JSon content for the pipeline + * @throws Exception if something goes wrong + */ + private static void createPipelineWithJsonInElasticsearch(RestClient client, String pipeline, String json) throws Exception { + logger.trace("createPipeline([{}])", pipeline); + + assert client != null; + assert pipeline != null; + + Request request = new Request("PUT", "/_ingest/pipeline/" + pipeline); + request.setJsonEntity(json); + Response response = client.performRequest(request); + + if (response.getStatusLine().getStatusCode() != 200) { + logger.warn("Could not create pipeline [{}]", pipeline); + throw new Exception("Could not create pipeline [" + pipeline + "]."); + } + + logger.trace("/createPipeline([{}])", pipeline); + } + + /** + * Check if a pipeline exists + * + * @param client Elasticsearch client + * @param pipeline the id of the pipeline + * @return true if the pipeline exists + * @throws IOException if something goes wrong + */ + public static boolean isPipelineExist(RestClient client, String pipeline) throws IOException { + try { + Response response = client.performRequest(new Request("GET", "/_ingest/pipeline/" + pipeline)); + return response.getEntity() != null; + } + catch (ResponseException e) { + if (404 != e.getResponse().getStatusLine().getStatusCode()) { + throw e; + } + } + return false; + } +} diff --git a/src/main/java/fr/pilato/elasticsearch/tools/pipeline/PipelineFinder.java b/src/main/java/fr/pilato/elasticsearch/tools/pipeline/PipelineFinder.java new file mode 100644 index 0000000..fe850d7 --- /dev/null +++ b/src/main/java/fr/pilato/elasticsearch/tools/pipeline/PipelineFinder.java @@ -0,0 +1,64 @@ +package fr.pilato.elasticsearch.tools.pipeline; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import fr.pilato.elasticsearch.tools.ResourceList; +import fr.pilato.elasticsearch.tools.SettingsFinder; +import fr.pilato.elasticsearch.tools.template.TemplateFinder; + +/** + * Findes ingest pipelines on the classpath. + * + * @author hjk181 + * + */ +public class PipelineFinder extends SettingsFinder { + + private static final Logger logger = LoggerFactory.getLogger(TemplateFinder.class); + + /** + * Find all pipelines in default classpath dir + * + * @return a list of pipelines + * @throws IOException if connection with elasticsearch is failing + * @throws URISyntaxException this should not happen + */ + public static List findPipelines() throws IOException, URISyntaxException { + return findPipelines(Defaults.ConfigDir); + } + + /** + * Find all pipelines + * + * @param root dir within the classpath + * @return a list of pipelines + * @throws IOException if connection with elasticsearch is failing + * @throws URISyntaxException this should not happen + */ + public static List findPipelines(String root) throws IOException, URISyntaxException { + if (root == null) { + return findPipelines(); + } + + logger.debug("Looking for pipelines in classpath under [{}].", root); + + final List pipelineNames = new ArrayList<>(); + String[] resources = ResourceList.getResources(root + "/" + Defaults.PipelineDir + "/"); // "es/_pipeline/" + for (String resource : resources) { + if (!resource.isEmpty()) { + String withoutIndex = resource.substring(resource.indexOf("/") + 1); + String pipeline = withoutIndex.substring(0, withoutIndex.indexOf(Defaults.JsonFileExtension)); + logger.trace(" - found [{}].", pipeline); + pipelineNames.add(pipeline); + } + } + + return pipelineNames; + } +} \ No newline at end of file diff --git a/src/main/java/fr/pilato/elasticsearch/tools/pipeline/PipelineSettingsReader.java b/src/main/java/fr/pilato/elasticsearch/tools/pipeline/PipelineSettingsReader.java new file mode 100644 index 0000000..28db404 --- /dev/null +++ b/src/main/java/fr/pilato/elasticsearch/tools/pipeline/PipelineSettingsReader.java @@ -0,0 +1,58 @@ +/* + * Licensed to David Pilato (the "Author") under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Author licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package fr.pilato.elasticsearch.tools.pipeline; + +import java.io.IOException; + +import fr.pilato.elasticsearch.tools.SettingsFinder.Defaults; +import fr.pilato.elasticsearch.tools.SettingsReader; + +/** + * Manage elasticsearch pipeline files. + * + * @author hjk181 + */ +public class PipelineSettingsReader extends SettingsReader { + + /** + * Read a pipeline + * @param root dir within the classpath + * @param pipeline the id of the pipeline (.json will be appended) + * @return The pipeline content + * @throws IOException if the connection with elasticsearch is failing + */ + public static String readPipeline(String root, String pipeline) throws IOException { + if (root == null) { + return readPipeline(pipeline); + } + String settingsFile = root + "/" + Defaults.PipelineDir + "/" + pipeline + Defaults.JsonFileExtension; + return readFileFromClasspath(settingsFile); + } + + /** + * Read a pipeline in default classpath dir + * @param pipeline the id of the pipeline (.json will be appended) + * @return The pipeline content + * @throws IOException if the connection with elasticsearch is failing + */ + public static String readPipeline(String pipeline) throws IOException { + return readPipeline(Defaults.ConfigDir, pipeline); + } +} \ No newline at end of file diff --git a/src/test/java/fr/pilato/elasticsearch/tools/BeyonderRestIT.java b/src/test/java/fr/pilato/elasticsearch/tools/BeyonderRestIT.java index 6532847..e74b5d3 100644 --- a/src/test/java/fr/pilato/elasticsearch/tools/BeyonderRestIT.java +++ b/src/test/java/fr/pilato/elasticsearch/tools/BeyonderRestIT.java @@ -21,6 +21,8 @@ import fr.pilato.elasticsearch.tools.alias.AliasElasticsearchUpdater; import fr.pilato.elasticsearch.tools.index.IndexElasticsearchUpdater; +import fr.pilato.elasticsearch.tools.pipeline.PipelineElasticsearchUpdater; + import org.apache.commons.io.IOUtils; import org.apache.http.HttpEntity; import org.elasticsearch.client.Request; @@ -142,6 +144,12 @@ public void testForceEnabled() throws Exception { String newMapping = getMapping("twitter"); assertThat(newMapping, is(not(oldMapping))); } + + @Test + public void testPipeline() throws Exception { + ElasticsearchBeyonder.start(client, "models/pipeline"); + assertThat(PipelineElasticsearchUpdater.isPipelineExist(client, "twitter_pipeline"), is(true)); + } private String getMapping(String indexName) throws IOException { HttpEntity response = client.performRequest(new Request("GET", indexName + "/_mapping")).getEntity(); diff --git a/src/test/resources/models/pipeline/_pipeline/twitter_pipeline.json b/src/test/resources/models/pipeline/_pipeline/twitter_pipeline.json new file mode 100644 index 0000000..60b74ea --- /dev/null +++ b/src/test/resources/models/pipeline/_pipeline/twitter_pipeline.json @@ -0,0 +1,11 @@ +{ + "description" : "Twitter pipeline", + "processors" : [ + { + "set" : { + "field": "foo", + "value": "bar" + } + } + ] +} \ No newline at end of file From efb8c567c6fdf58f0572e96b00b1e40e2a2722df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Nystr=C3=B6m?= Date: Mon, 31 May 2021 09:42:50 +0200 Subject: [PATCH 9/9] spaces vs tabs add license to file header --- .../PipelineElasticsearchUpdater.java | 201 ++++++++++-------- .../tools/pipeline/PipelineFinder.java | 91 ++++---- .../pipeline/PipelineSettingsReader.java | 46 ++-- 3 files changed, 188 insertions(+), 150 deletions(-) diff --git a/src/main/java/fr/pilato/elasticsearch/tools/pipeline/PipelineElasticsearchUpdater.java b/src/main/java/fr/pilato/elasticsearch/tools/pipeline/PipelineElasticsearchUpdater.java index 19d5076..4d93c55 100644 --- a/src/main/java/fr/pilato/elasticsearch/tools/pipeline/PipelineElasticsearchUpdater.java +++ b/src/main/java/fr/pilato/elasticsearch/tools/pipeline/PipelineElasticsearchUpdater.java @@ -1,3 +1,22 @@ +/* + * Licensed to David Pilato (the "Author") under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Author licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package fr.pilato.elasticsearch.tools.pipeline; import java.io.IOException; @@ -17,105 +36,105 @@ */ public class PipelineElasticsearchUpdater { - private static final Logger logger = LoggerFactory.getLogger(PipelineElasticsearchUpdater.class); + private static final Logger logger = LoggerFactory.getLogger(PipelineElasticsearchUpdater.class); - /** - * Create a pipeline in Elasticsearch. - * - * @param client Elasticsearch client - * @param root dir within the classpath - * @param pipeline the id of the pipeline - * @param force set it to true if you want to force cleaning pipeline before adding it - * @throws Exception if something goes wrong - */ - public static void createPipeline(RestClient client, String root, String pipeline, boolean force) throws Exception { - String json = PipelineSettingsReader.readPipeline(root, pipeline); - createPipelineWithJson(client, pipeline, json, force); - } + /** + * Create a pipeline in Elasticsearch. + * + * @param client Elasticsearch client + * @param root dir within the classpath + * @param pipeline the id of the pipeline + * @param force set it to true if you want to force cleaning pipeline before adding it + * @throws Exception if something goes wrong + */ + public static void createPipeline(RestClient client, String root, String pipeline, boolean force) throws Exception { + String json = PipelineSettingsReader.readPipeline(root, pipeline); + createPipelineWithJson(client, pipeline, json, force); + } - /** - * Create a pipeline in Elasticsearch. Read content from default classpath dir. - * - * @param client Elasticsearch client - * @param pipeline the id of the pipeline - * @param force set it to true if you want to force cleaning pipeline before adding it - * @throws Exception if something goes wrong - */ - public static void createPipeline(RestClient client, String pipeline, boolean force) throws Exception { - String json = PipelineSettingsReader.readPipeline(pipeline); - createPipelineWithJson(client, pipeline, json, force); - } + /** + * Create a pipeline in Elasticsearch. Read content from default classpath dir. + * + * @param client Elasticsearch client + * @param pipeline the id of the pipeline + * @param force set it to true if you want to force cleaning pipeline before adding it + * @throws Exception if something goes wrong + */ + public static void createPipeline(RestClient client, String pipeline, boolean force) throws Exception { + String json = PipelineSettingsReader.readPipeline(pipeline); + createPipelineWithJson(client, pipeline, json, force); + } - /** - * Create a new pipeline in Elasticsearch - * - * @param client Elasticsearch client - * @param pipeline the id of the pipeline - * @param json JSon content for the pipeline - * @param force set it to true if you want to force cleaning pipeline before adding it - * @throws Exception if something goes wrong - */ - public static void createPipelineWithJson(RestClient client, String pipeline, String json, boolean force) throws Exception { - if (isPipelineExist(client, pipeline)) { - if (force) { - logger.debug("Pipeline [{}] already exists. Force is set. Overriding it.", pipeline); - createPipelineWithJsonInElasticsearch(client, pipeline, json); - } - else { - logger.debug("Pipeline [{}] already exists.", pipeline); - } - } + /** + * Create a new pipeline in Elasticsearch + * + * @param client Elasticsearch client + * @param pipeline the id of the pipeline + * @param json JSon content for the pipeline + * @param force set it to true if you want to force cleaning pipeline before adding it + * @throws Exception if something goes wrong + */ + public static void createPipelineWithJson(RestClient client, String pipeline, String json, boolean force) throws Exception { + if (isPipelineExist(client, pipeline)) { + if (force) { + logger.debug("Pipeline [{}] already exists. Force is set. Overriding it.", pipeline); + createPipelineWithJsonInElasticsearch(client, pipeline, json); + } + else { + logger.debug("Pipeline [{}] already exists.", pipeline); + } + } - if (!isPipelineExist(client, pipeline)) { - logger.debug("Pipeline [{}] doesn't exist. Creating it.", pipeline); - createPipelineWithJsonInElasticsearch(client, pipeline, json); - } - } + if (!isPipelineExist(client, pipeline)) { + logger.debug("Pipeline [{}] doesn't exist. Creating it.", pipeline); + createPipelineWithJsonInElasticsearch(client, pipeline, json); + } + } - /** - * Create a new pipeline in Elasticsearch. - * - * @param client Elasticsearch client - * @param pipeline the id of the pipeline - * @param json JSon content for the pipeline - * @throws Exception if something goes wrong - */ - private static void createPipelineWithJsonInElasticsearch(RestClient client, String pipeline, String json) throws Exception { - logger.trace("createPipeline([{}])", pipeline); + /** + * Create a new pipeline in Elasticsearch. + * + * @param client Elasticsearch client + * @param pipeline the id of the pipeline + * @param json JSon content for the pipeline + * @throws Exception if something goes wrong + */ + private static void createPipelineWithJsonInElasticsearch(RestClient client, String pipeline, String json) throws Exception { + logger.trace("createPipeline([{}])", pipeline); - assert client != null; - assert pipeline != null; + assert client != null; + assert pipeline != null; - Request request = new Request("PUT", "/_ingest/pipeline/" + pipeline); - request.setJsonEntity(json); - Response response = client.performRequest(request); + Request request = new Request("PUT", "/_ingest/pipeline/" + pipeline); + request.setJsonEntity(json); + Response response = client.performRequest(request); - if (response.getStatusLine().getStatusCode() != 200) { - logger.warn("Could not create pipeline [{}]", pipeline); - throw new Exception("Could not create pipeline [" + pipeline + "]."); - } + if (response.getStatusLine().getStatusCode() != 200) { + logger.warn("Could not create pipeline [{}]", pipeline); + throw new Exception("Could not create pipeline [" + pipeline + "]."); + } - logger.trace("/createPipeline([{}])", pipeline); - } + logger.trace("/createPipeline([{}])", pipeline); + } - /** - * Check if a pipeline exists - * - * @param client Elasticsearch client - * @param pipeline the id of the pipeline - * @return true if the pipeline exists - * @throws IOException if something goes wrong - */ - public static boolean isPipelineExist(RestClient client, String pipeline) throws IOException { - try { - Response response = client.performRequest(new Request("GET", "/_ingest/pipeline/" + pipeline)); - return response.getEntity() != null; - } - catch (ResponseException e) { - if (404 != e.getResponse().getStatusLine().getStatusCode()) { - throw e; - } - } - return false; - } + /** + * Check if a pipeline exists + * + * @param client Elasticsearch client + * @param pipeline the id of the pipeline + * @return true if the pipeline exists + * @throws IOException if something goes wrong + */ + public static boolean isPipelineExist(RestClient client, String pipeline) throws IOException { + try { + Response response = client.performRequest(new Request("GET", "/_ingest/pipeline/" + pipeline)); + return response.getEntity() != null; + } + catch (ResponseException e) { + if (404 != e.getResponse().getStatusLine().getStatusCode()) { + throw e; + } + } + return false; + } } diff --git a/src/main/java/fr/pilato/elasticsearch/tools/pipeline/PipelineFinder.java b/src/main/java/fr/pilato/elasticsearch/tools/pipeline/PipelineFinder.java index fe850d7..578940d 100644 --- a/src/main/java/fr/pilato/elasticsearch/tools/pipeline/PipelineFinder.java +++ b/src/main/java/fr/pilato/elasticsearch/tools/pipeline/PipelineFinder.java @@ -1,3 +1,22 @@ +/* + * Licensed to David Pilato (the "Author") under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Author licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package fr.pilato.elasticsearch.tools.pipeline; import java.io.IOException; @@ -20,45 +39,45 @@ */ public class PipelineFinder extends SettingsFinder { - private static final Logger logger = LoggerFactory.getLogger(TemplateFinder.class); + private static final Logger logger = LoggerFactory.getLogger(TemplateFinder.class); - /** - * Find all pipelines in default classpath dir - * - * @return a list of pipelines - * @throws IOException if connection with elasticsearch is failing - * @throws URISyntaxException this should not happen - */ - public static List findPipelines() throws IOException, URISyntaxException { - return findPipelines(Defaults.ConfigDir); - } + /** + * Find all pipelines in default classpath dir + * + * @return a list of pipelines + * @throws IOException if connection with elasticsearch is failing + * @throws URISyntaxException this should not happen + */ + public static List findPipelines() throws IOException, URISyntaxException { + return findPipelines(Defaults.ConfigDir); + } - /** - * Find all pipelines - * - * @param root dir within the classpath - * @return a list of pipelines - * @throws IOException if connection with elasticsearch is failing - * @throws URISyntaxException this should not happen - */ - public static List findPipelines(String root) throws IOException, URISyntaxException { - if (root == null) { - return findPipelines(); - } + /** + * Find all pipelines + * + * @param root dir within the classpath + * @return a list of pipelines + * @throws IOException if connection with elasticsearch is failing + * @throws URISyntaxException this should not happen + */ + public static List findPipelines(String root) throws IOException, URISyntaxException { + if (root == null) { + return findPipelines(); + } - logger.debug("Looking for pipelines in classpath under [{}].", root); + logger.debug("Looking for pipelines in classpath under [{}].", root); - final List pipelineNames = new ArrayList<>(); - String[] resources = ResourceList.getResources(root + "/" + Defaults.PipelineDir + "/"); // "es/_pipeline/" - for (String resource : resources) { - if (!resource.isEmpty()) { - String withoutIndex = resource.substring(resource.indexOf("/") + 1); - String pipeline = withoutIndex.substring(0, withoutIndex.indexOf(Defaults.JsonFileExtension)); - logger.trace(" - found [{}].", pipeline); - pipelineNames.add(pipeline); - } - } + final List pipelineNames = new ArrayList<>(); + String[] resources = ResourceList.getResources(root + "/" + Defaults.PipelineDir + "/"); // "es/_pipeline/" + for (String resource : resources) { + if (!resource.isEmpty()) { + String withoutIndex = resource.substring(resource.indexOf("/") + 1); + String pipeline = withoutIndex.substring(0, withoutIndex.indexOf(Defaults.JsonFileExtension)); + logger.trace(" - found [{}].", pipeline); + pipelineNames.add(pipeline); + } + } - return pipelineNames; - } + return pipelineNames; + } } \ No newline at end of file diff --git a/src/main/java/fr/pilato/elasticsearch/tools/pipeline/PipelineSettingsReader.java b/src/main/java/fr/pilato/elasticsearch/tools/pipeline/PipelineSettingsReader.java index 28db404..b793c7c 100644 --- a/src/main/java/fr/pilato/elasticsearch/tools/pipeline/PipelineSettingsReader.java +++ b/src/main/java/fr/pilato/elasticsearch/tools/pipeline/PipelineSettingsReader.java @@ -31,28 +31,28 @@ */ public class PipelineSettingsReader extends SettingsReader { - /** - * Read a pipeline - * @param root dir within the classpath - * @param pipeline the id of the pipeline (.json will be appended) - * @return The pipeline content - * @throws IOException if the connection with elasticsearch is failing - */ - public static String readPipeline(String root, String pipeline) throws IOException { - if (root == null) { - return readPipeline(pipeline); - } - String settingsFile = root + "/" + Defaults.PipelineDir + "/" + pipeline + Defaults.JsonFileExtension; - return readFileFromClasspath(settingsFile); - } + /** + * Read a pipeline + * @param root dir within the classpath + * @param pipeline the id of the pipeline (.json will be appended) + * @return The pipeline content + * @throws IOException if the connection with elasticsearch is failing + */ + public static String readPipeline(String root, String pipeline) throws IOException { + if (root == null) { + return readPipeline(pipeline); + } + String settingsFile = root + "/" + Defaults.PipelineDir + "/" + pipeline + Defaults.JsonFileExtension; + return readFileFromClasspath(settingsFile); + } - /** - * Read a pipeline in default classpath dir - * @param pipeline the id of the pipeline (.json will be appended) - * @return The pipeline content - * @throws IOException if the connection with elasticsearch is failing - */ - public static String readPipeline(String pipeline) throws IOException { - return readPipeline(Defaults.ConfigDir, pipeline); - } + /** + * Read a pipeline in default classpath dir + * @param pipeline the id of the pipeline (.json will be appended) + * @return The pipeline content + * @throws IOException if the connection with elasticsearch is failing + */ + public static String readPipeline(String pipeline) throws IOException { + return readPipeline(Defaults.ConfigDir, pipeline); + } } \ No newline at end of file