Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,34 @@ By default, Beyonder will not overwrite a template if it already exists.
This can be overridden by setting `force` to `true` in the expanded factory method
`ElasticsearchBeyonder.start()`.

Managing pipelines
------------------

A pipeline is a definition of a series of processors that are to be executed in the same order as they are declared while
documents are being indexed. Please note that this feature is only supported when you use the REST client not the Transport client.

For example, setting one fields value based on another field by using an Set Processor you an add a file named `elasticsearch/_pipeline/set_field_processor`
in your project:

```javascript
{
"description" : "Twitter pipeline",
"processors" : [
{
"set" : {
"field": "copy",
"value": "{{otherField}}"
}
}
]
}
```

By default, Beyonder will not overwrite a pipeline if it already exists.
This can be overridden by setting `force` to `true` in the expanded factory method
`ElasticsearchBeyonder.start()`.


Why this name?
==============

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import fr.pilato.elasticsearch.tools.SettingsFinder.Defaults;
import fr.pilato.elasticsearch.tools.index.IndexFinder;
import fr.pilato.elasticsearch.tools.template.TemplateFinder;
import fr.pilato.elasticsearch.tools.pipeline.PipelineFinder;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.RestClient;
import org.slf4j.Logger;
Expand All @@ -33,6 +34,7 @@
import static fr.pilato.elasticsearch.tools.index.IndexElasticsearchUpdater.createIndex;
import static fr.pilato.elasticsearch.tools.index.IndexElasticsearchUpdater.updateSettings;
import static fr.pilato.elasticsearch.tools.template.TemplateElasticsearchUpdater.createTemplate;
import static fr.pilato.elasticsearch.tools.pipeline.PipelineElasticsearchUpdater.createPipeline;

/**
* By default, indexes are created with their default Elasticsearch settings. You can specify
Expand Down Expand Up @@ -109,6 +111,12 @@ public static void start(RestClient client, String root, boolean merge, boolean
createTemplate(client, root, templateName, force);
}

// create pipelines
List<String> pipelineNames = PipelineFinder.findPipelines(root);
for (String pipelineName : pipelineNames) {
createPipeline(client, root, pipelineName, force);
}

// create indices
Collection<String> indexNames = IndexFinder.findIndexNames(root);
for (String indexName : indexNames) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public static class Defaults {
public static String IndexSettingsFileName = "_settings.json";
public static String UpdateIndexSettingsFileName = "_update_settings.json";
public static String TemplateDir = "_template";
public static String PipelineDir = "_pipeline";

/**
* Default setting of whether or not to merge mappings on start.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public static List<String> findIndexNames(final String root) throws IOException,
} else {
key = resource;
}
if (!key.equals(Defaults.TemplateDir) && !keys.contains(key)) {
if (!key.equals(Defaults.TemplateDir) && !key.equals(Defaults.PipelineDir) && !keys.contains(key)) {
logger.trace(" - found [{}].", key);
keys.add(key);
indexNames.add(key);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package fr.pilato.elasticsearch.tools.pipeline;

import java.io.IOException;

import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Handles ingest pipeline creation.
*
* @author hjk181
*
*/
public class PipelineElasticsearchUpdater {

private static final Logger logger = LoggerFactory.getLogger(PipelineElasticsearchUpdater.class);

/**
* Create a pipeline in Elasticsearch.
*
* @param client Elasticsearch client
* @param root dir within the classpath
* @param pipeline the id of the pipeline
* @param force set it to true if you want to force cleaning pipeline before adding it
* @throws Exception if something goes wrong
*/
public static void createPipeline(RestClient client, String root, String pipeline, boolean force) throws Exception {
String json = PipelineSettingsReader.readPipeline(root, pipeline);
createPipelineWithJson(client, pipeline, json, force);
}

/**
* Create a pipeline in Elasticsearch. Read content from default classpath dir.
*
* @param client Elasticsearch client
* @param pipeline the id of the pipeline
* @param force set it to true if you want to force cleaning pipeline before adding it
* @throws Exception if something goes wrong
*/
public static void createPipeline(RestClient client, String pipeline, boolean force) throws Exception {
String json = PipelineSettingsReader.readPipeline(pipeline);
createPipelineWithJson(client, pipeline, json, force);
}

/**
* Create a new pipeline in Elasticsearch
*
* @param client Elasticsearch client
* @param pipeline the id of the pipeline
* @param json JSon content for the pipeline
* @param force set it to true if you want to force cleaning pipeline before adding it
* @throws Exception if something goes wrong
*/
public static void createPipelineWithJson(RestClient client, String pipeline, String json, boolean force) throws Exception {
if (isPipelineExist(client, pipeline)) {
if (force) {
logger.debug("Pipeline [{}] already exists. Force is set. Overriding it.", pipeline);
createPipelineWithJsonInElasticsearch(client, pipeline, json);
}
else {
logger.debug("Pipeline [{}] already exists.", pipeline);
}
}

if (!isPipelineExist(client, pipeline)) {
logger.debug("Pipeline [{}] doesn't exist. Creating it.", pipeline);
createPipelineWithJsonInElasticsearch(client, pipeline, json);
}
}

/**
* Create a new pipeline in Elasticsearch.
*
* @param client Elasticsearch client
* @param pipeline the id of the pipeline
* @param json JSon content for the pipeline
* @throws Exception if something goes wrong
*/
private static void createPipelineWithJsonInElasticsearch(RestClient client, String pipeline, String json) throws Exception {
logger.trace("createPipeline([{}])", pipeline);

assert client != null;
assert pipeline != null;

Request request = new Request("PUT", "/_ingest/pipeline/" + pipeline);
request.setJsonEntity(json);
Response response = client.performRequest(request);

if (response.getStatusLine().getStatusCode() != 200) {
logger.warn("Could not create pipeline [{}]", pipeline);
throw new Exception("Could not create pipeline [" + pipeline + "].");
}

logger.trace("/createPipeline([{}])", pipeline);
}

/**
* Check if a pipeline exists
*
* @param client Elasticsearch client
* @param pipeline the id of the pipeline
* @return true if the pipeline exists
* @throws IOException if something goes wrong
*/
public static boolean isPipelineExist(RestClient client, String pipeline) throws IOException {
try {
Response response = client.performRequest(new Request("GET", "/_ingest/pipeline/" + pipeline));
return response.getEntity() != null;
}
catch (ResponseException e) {
if (404 != e.getResponse().getStatusLine().getStatusCode()) {
throw e;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package fr.pilato.elasticsearch.tools.pipeline;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import fr.pilato.elasticsearch.tools.ResourceList;
import fr.pilato.elasticsearch.tools.SettingsFinder;
import fr.pilato.elasticsearch.tools.template.TemplateFinder;

/**
* Findes ingest pipelines on the classpath.
*
* @author hjk181
*
*/
public class PipelineFinder extends SettingsFinder {

private static final Logger logger = LoggerFactory.getLogger(TemplateFinder.class);

/**
* Find all pipelines in default classpath dir
*
* @return a list of pipelines
* @throws IOException if connection with elasticsearch is failing
* @throws URISyntaxException this should not happen
*/
public static List<String> findPipelines() throws IOException, URISyntaxException {
return findPipelines(Defaults.ConfigDir);
}

/**
* Find all pipelines
*
* @param root dir within the classpath
* @return a list of pipelines
* @throws IOException if connection with elasticsearch is failing
* @throws URISyntaxException this should not happen
*/
public static List<String> findPipelines(String root) throws IOException, URISyntaxException {
if (root == null) {
return findPipelines();
}

logger.debug("Looking for pipelines in classpath under [{}].", root);

final List<String> pipelineNames = new ArrayList<>();
String[] resources = ResourceList.getResources(root + "/" + Defaults.PipelineDir + "/"); // "es/_pipeline/"
for (String resource : resources) {
if (!resource.isEmpty()) {
String withoutIndex = resource.substring(resource.indexOf("/") + 1);
String pipeline = withoutIndex.substring(0, withoutIndex.indexOf(Defaults.JsonFileExtension));
logger.trace(" - found [{}].", pipeline);
pipelineNames.add(pipeline);
}
}

return pipelineNames;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to David Pilato (the "Author") under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Author licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package fr.pilato.elasticsearch.tools.pipeline;

import java.io.IOException;

import fr.pilato.elasticsearch.tools.SettingsFinder.Defaults;
import fr.pilato.elasticsearch.tools.SettingsReader;

/**
* Manage elasticsearch pipeline files.
*
* @author hjk181
*/
public class PipelineSettingsReader extends SettingsReader {

/**
* Read a pipeline
* @param root dir within the classpath
* @param pipeline the id of the pipeline (.json will be appended)
* @return The pipeline content
* @throws IOException if the connection with elasticsearch is failing
*/
public static String readPipeline(String root, String pipeline) throws IOException {
if (root == null) {
return readPipeline(pipeline);
}
String settingsFile = root + "/" + Defaults.PipelineDir + "/" + pipeline + Defaults.JsonFileExtension;
return readFileFromClasspath(settingsFile);
}

/**
* Read a pipeline in default classpath dir
* @param pipeline the id of the pipeline (.json will be appended)
* @return The pipeline content
* @throws IOException if the connection with elasticsearch is failing
*/
public static String readPipeline(String pipeline) throws IOException {
return readPipeline(Defaults.ConfigDir, pipeline);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import fr.pilato.elasticsearch.tools.alias.AliasElasticsearchUpdater;
import fr.pilato.elasticsearch.tools.index.IndexElasticsearchUpdater;
import fr.pilato.elasticsearch.tools.pipeline.PipelineElasticsearchUpdater;

import org.apache.commons.io.IOUtils;
import org.apache.http.HttpEntity;
import org.elasticsearch.client.Request;
Expand Down Expand Up @@ -142,6 +144,12 @@ public void testForceEnabled() throws Exception {
String newMapping = getMapping("twitter");
assertThat(newMapping, is(not(oldMapping)));
}

@Test
public void testPipeline() throws Exception {
ElasticsearchBeyonder.start(client, "models/pipeline");
assertThat(PipelineElasticsearchUpdater.isPipelineExist(client, "twitter_pipeline"), is(true));
}

private String getMapping(String indexName) throws IOException {
HttpEntity response = client.performRequest(new Request("GET", indexName + "/_mapping")).getEntity();
Expand Down
11 changes: 11 additions & 0 deletions src/test/resources/models/pipeline/_pipeline/twitter_pipeline.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"description" : "Twitter pipeline",
"processors" : [
{
"set" : {
"field": "foo",
"value": "bar"
}
}
]
}