From 5ffcdb6db4c3c91c9a34f15a0fa224bb3ba874b3 Mon Sep 17 00:00:00 2001 From: David Clement Date: Tue, 25 Apr 2017 16:41:25 +0200 Subject: [PATCH] Use external mapping ES instead of push mapping Signed-off-by: David Clement --- .../diskstorage/es/ElasticSearchClient.java | 2 + .../diskstorage/es/ElasticSearchIndex.java | 42 +++++- .../es/TransportElasticSearchClient.java | 10 +- .../es/rest/RestElasticSearchClient.java | 9 ++ .../es/rest/RestIndexMappings.java | 48 +++++++ .../es/ElasticSearchConfigTest.java | 133 +++++++++++++++++- janusgraph-es/src/test/resources/mapping.json | 46 ++++++ .../src/test/resources/template.json | 47 +++++++ 8 files changed, 329 insertions(+), 8 deletions(-) create mode 100644 janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestIndexMappings.java create mode 100644 janusgraph-es/src/test/resources/mapping.json create mode 100644 janusgraph-es/src/test/resources/template.json diff --git a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchClient.java b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchClient.java index a2443de461..7965bf228e 100644 --- a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchClient.java +++ b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchClient.java @@ -36,6 +36,8 @@ public interface ElasticSearchClient extends Closeable { void createMapping(String indexName, String typeName, XContentBuilder mapping) throws IOException; + Map getMapping(String indexName, String typeName) throws IOException; + void deleteIndex(String indexName) throws IOException; void bulkRequest(List requests) throws IOException; diff --git a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchIndex.java b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchIndex.java index 5ba241c844..a43f56013b 100644 --- a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchIndex.java +++ b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchIndex.java @@ -150,7 +150,7 @@ public class ElasticSearchIndex implements IndexProvider { public static final ConfigOption BULK_REFRESH = new ConfigOption(ELASTICSEARCH_NS, "bulk-refresh", "Elasticsearch bulk API refresh setting used to control when changes made by this request are made " + - "visible to search", ConfigOption.Type.MASKABLE, "false"); + "visible to search", ConfigOption.Type.MASKABLE, "false"); public static final ConfigNamespace ES_EXTRAS_NS = new ConfigNamespace(ELASTICSEARCH_NS, "ext", "Overrides for arbitrary elasticsearch.yaml settings", true); @@ -168,6 +168,10 @@ public class ElasticSearchIndex implements IndexProvider { public static final ConfigNamespace ES_CREATE_EXTRAS_NS = new ConfigNamespace(ES_CREATE_NS, "ext", "Overrides for arbitrary settings applied at index creation", true); + public static final ConfigOption USE_EXTERNAL_MAPPINGS = + new ConfigOption(ES_CREATE_NS, "use-external-mappings", + "Whether JanusGraph should make use of an external mapping when registering an index.", ConfigOption.Type.MASKABLE, false); + private static final IndexFeatures ES_FEATURES = new IndexFeatures.Builder() .setDefaultStringMapping(Mapping.TEXT).supportedStringMappings(Mapping.TEXT, Mapping.TEXTSTRING, Mapping.STRING).setWildcardField("_all").supportsCardinality(Cardinality.SINGLE).supportsCardinality(Cardinality.LIST).supportsCardinality(Cardinality.SET).supportsNanoseconds().supportsCustomAnalyzer().build(); @@ -189,9 +193,11 @@ public class ElasticSearchIndex implements IndexProvider { private final String indexName; private final int maxResultsSize; private final String scriptLang; + private final boolean useExternalMappings; public ElasticSearchIndex(Configuration config) throws BackendException { indexName = config.get(INDEX_NAME); + useExternalMappings = config.get(USE_EXTERNAL_MAPPINGS); checkExpectedClientVersion(); @@ -227,8 +233,8 @@ public ElasticSearchIndex(Configuration config) throws BackendException { private void checkForOrCreateIndex(Configuration config) throws IOException { Preconditions.checkState(null != client); - //Create index if it does not already exist - if (!client.indexExists(indexName)) { + //Create index if it does not useExternalMappings and if it does not already exist + if (!useExternalMappings && !client.indexExists(indexName)) { Settings.Builder settings = Settings.builder(); @@ -243,8 +249,8 @@ private void checkForOrCreateIndex(Configuration config) throws IOException { } catch (InterruptedException e) { throw new JanusGraphException("Interrupted while waiting for index to settle in", e); } - if (!client.indexExists(indexName)) throw new IllegalArgumentException("Could not create index: " + indexName); } + if (!client.indexExists(indexName)) throw new IllegalArgumentException("Could not create index: " + indexName); } @@ -286,13 +292,37 @@ private static Map spatialPredicates() { @Override public void register(String store, String key, KeyInformation information, BaseTransaction tx) throws BackendException { - XContentBuilder mapping; Class dataType = information.getDataType(); Mapping map = Mapping.getMapping(information); Preconditions.checkArgument(map==Mapping.DEFAULT || AttributeUtil.isString(dataType) || (map==Mapping.PREFIX_TREE && AttributeUtil.isGeo(dataType)), "Specified illegal mapping [%s] for data type [%s]",map,dataType); + if (useExternalMappings) { + try { + //We check if the externalMapping have the property 'key' + Map mappings = client.getMapping(indexName, store); + if (!mappings.containsKey(key)) { + throw new PermanentBackendException("The external mapping for index '"+indexName+"' and type '"+store+"' do not have property '"+key+"'"); + } + } catch (IOException e) { + throw new PermanentBackendException(e); + } + } else { + this.pushMapping(store, key, information); + } + } + + /** + * Push mapping to ElasticSearch + * @param store the type in the index + * @param key the name of the property in the index + * @param information information of the key + */ + private void pushMapping(String store, String key, KeyInformation information) throws AssertionError, PermanentBackendException, BackendException { + Class dataType = information.getDataType(); + Mapping map = Mapping.getMapping(information); + XContentBuilder mapping; try { mapping = XContentFactory.jsonBuilder(). startObject(). @@ -317,7 +347,7 @@ public void register(String store, String key, KeyInformation information, BaseT if (textAnalyzer != null) { mapping.field(ANALYZER, textAnalyzer); } - break; + break; case TEXTSTRING: if (textAnalyzer != null) { mapping.field(ANALYZER, textAnalyzer); diff --git a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/TransportElasticSearchClient.java b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/TransportElasticSearchClient.java index ade2fa213d..930d4bec76 100644 --- a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/TransportElasticSearchClient.java +++ b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/TransportElasticSearchClient.java @@ -18,6 +18,8 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; import org.elasticsearch.action.bulk.BulkItemResponse; @@ -94,7 +96,13 @@ public Map getIndexSettings(String indexName) throws IOException { @Override public void createMapping(String indexName, String typeName, XContentBuilder mapping) throws IOException { - client.admin().indices().preparePutMapping(indexName).setType(typeName).setSource(mapping).execute().actionGet(); + client.admin().indices().preparePutMapping(indexName).setType(typeName).setSource(mapping).execute().actionGet(); + } + + @Override + public Map getMapping(String indexName, String typeName) throws IOException { + GetMappingsResponse response = client.admin().indices().getMappings(new GetMappingsRequest().indices(indexName.toLowerCase()).types(typeName)).actionGet(); + return (Map)response.getMappings().get(indexName.toLowerCase()).get(typeName).getSourceAsMap().get("properties"); } @Override diff --git a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java index 9b3a3aa9ca..e137814fa9 100644 --- a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java +++ b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java @@ -156,6 +156,15 @@ public void createMapping(String indexName, String typeName, XContentBuilder map performRequest("PUT", "/" + indexName + "/_mapping/" + typeName, bytes); } + @Override + public Map getMapping(String indexName, String typeName) throws IOException{ + Response response = performRequest("GET", "/" + indexName.toLowerCase() + "/_mapping/"+typeName, null); + try (final InputStream inputStream = response.getEntity().getContent()) { + Map settings = mapper.readValue(inputStream, new TypeReference>() {}); + return settings.get(indexName).getMappings().get(typeName).getProperties(); + } + } + @Override public void deleteIndex(String indexName) throws IOException { try { diff --git a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestIndexMappings.java b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestIndexMappings.java new file mode 100644 index 0000000000..2b81410e7e --- /dev/null +++ b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestIndexMappings.java @@ -0,0 +1,48 @@ +// Copyright 2017 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.es.rest; + +import java.util.Map; + +/** + * Serialization of Elasticsearch index mapping. + * + * @author David Clement (davidclement90@laposte.net) + */ +public class RestIndexMappings { + + private Map mappings; + + public Map getMappings() { + return mappings; + } + + public void setMappings(Map mappings){ + this.mappings = mappings; + } + + public static class RestIndexMapping { + + private Map properties; + + public Map getProperties() { + return properties; + } + + public void setProperties(Map properties) { + this.properties = properties; + } + } +} diff --git a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/ElasticSearchConfigTest.java b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/ElasticSearchConfigTest.java index 3d23341ec7..4c6dc5b808 100644 --- a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/ElasticSearchConfigTest.java +++ b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/ElasticSearchConfigTest.java @@ -31,12 +31,29 @@ import org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration; import org.janusgraph.graphdb.query.condition.PredicateCondition; import org.apache.commons.configuration.BaseConfiguration; +import org.apache.commons.io.IOUtils; +import org.apache.http.HttpHost; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpDelete; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.client.methods.HttpRequestBase; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import com.google.common.base.Joiner; + +import java.io.File; +import java.io.FileInputStream; import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.charset.Charset; import java.time.Duration; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_HOSTS; @@ -50,20 +67,37 @@ public class ElasticSearchConfigTest { private static final String INDEX_NAME = "escfg"; + private static final String ANALYZER_KEYWORD = "keyword"; + + private static final String ANALYZER_ENGLISH = "english"; + + private static final String ANALYZER_STANDARD = "standard"; + private ElasticsearchRunner esr; private int port; + private HttpHost host; + + private CloseableHttpClient httpClient; + @Before public void setup() throws Exception { esr = new ElasticsearchRunner(); esr.start(); port = getInterface() == ElasticSearchSetup.REST_CLIENT ? 9200 : 9300; + httpClient = HttpClients.createDefault(); + try { + host = new HttpHost(InetAddress.getByName("127.0.0.1"), 9200); + } catch (UnknownHostException e) { + fail(e.getMessage()); + } } @After public void teardown() throws Exception { esr.stop(); + IOUtils.closeQuietly(httpClient); } public ElasticSearchSetup getInterface() { @@ -128,11 +162,96 @@ public void testIndexCreationOptions() throws InterruptedException, BackendExcep client.close(); } + @Test + public void testExternalMappingsViaMapping() throws BackendException { + final Duration maxWrite = Duration.ofMillis(2000L); + final String storeName = "test_mapping"; + CommonsConfiguration cc = new CommonsConfiguration(new BaseConfiguration()); + ModifiableConfiguration config = new ModifiableConfiguration(GraphDatabaseConfiguration.ROOT_NS, cc, + BasicConfiguration.Restriction.NONE); + config.set(USE_EXTERNAL_MAPPINGS, true, INDEX_NAME); + Configuration indexConfig = config.restrictTo(INDEX_NAME); + FileInputStream fis = null; + try { + //Test create index KO mapping is not push + try { + new ElasticSearchIndex(indexConfig); + fail("should failed"); + } catch (IllegalArgumentException e) { + } + HttpPut newMapping = new HttpPut("janusgraph"); + fis = new FileInputStream(new File("src/test/resources/mapping.json")); + newMapping.setEntity(new StringEntity(Joiner.on("").join(IOUtils.readLines(fis)), Charset.forName("UTF-8"))); + executeRequest(newMapping); + + IndexProvider idx = new ElasticSearchIndex(indexConfig); + final KeyInformation.IndexRetriever indexRetriever = IndexProviderTest + .getIndexRetriever(IndexProviderTest.getMapping(idx.getFeatures(), ANALYZER_ENGLISH, ANALYZER_KEYWORD)); + BaseTransactionConfig txConfig = StandardBaseTransactionConfig.of(TimestampProviders.MILLI); + IndexTransaction itx = new IndexTransaction(idx, indexRetriever, txConfig, maxWrite); + + // Test date property OK + idx.register(storeName, "date", IndexProviderTest.getMapping(idx.getFeatures(), ANALYZER_ENGLISH, ANALYZER_KEYWORD).get("date"), itx); + // Test weight property KO + try { + idx.register(storeName, "weight", IndexProviderTest.getMapping(idx.getFeatures(), ANALYZER_ENGLISH, ANALYZER_KEYWORD).get("weight"), itx); + fail("should failed"); + } catch (BackendException e) { + } + } catch (IOException e) { + fail(e.getMessage()); + } finally { + IOUtils.closeQuietly(fis); + } + } + + @Test + public void testExternalMappingsViaTemplate() throws BackendException { + FileInputStream fis = null; + try { + HttpPut newTemplate = new HttpPut("_template/template_1"); + fis = new FileInputStream(new File("src/test/resources/template.json")); + newTemplate.setEntity(new StringEntity(Joiner.on("").join(IOUtils.readLines(fis)), Charset.forName("UTF-8"))); + executeRequest(newTemplate); + HttpPut newMapping = new HttpPut("janusgraph"); + executeRequest(newMapping); + final Duration maxWrite = Duration.ofMillis(2000L); + final String storeName = "test_mapping"; + CommonsConfiguration cc = new CommonsConfiguration(new BaseConfiguration()); + ModifiableConfiguration config = new ModifiableConfiguration(GraphDatabaseConfiguration.ROOT_NS, cc, + BasicConfiguration.Restriction.NONE); + config.set(USE_EXTERNAL_MAPPINGS, true, INDEX_NAME); + Configuration indexConfig = config.restrictTo(INDEX_NAME); + IndexProvider idx = new ElasticSearchIndex(indexConfig); + final KeyInformation.IndexRetriever indexRetriever = IndexProviderTest + .getIndexRetriever(IndexProviderTest.getMapping(idx.getFeatures(), ANALYZER_ENGLISH, ANALYZER_KEYWORD)); + BaseTransactionConfig txConfig = StandardBaseTransactionConfig.of(TimestampProviders.MILLI); + IndexTransaction itx = new IndexTransaction(idx, indexRetriever, txConfig, maxWrite); + // Test date property OK + idx.register(storeName, "date", IndexProviderTest.getMapping(idx.getFeatures(), ANALYZER_ENGLISH, ANALYZER_KEYWORD).get("date"), itx); + // Test weight property KO + try { + idx.register(storeName, "weight", IndexProviderTest.getMapping(idx.getFeatures(), ANALYZER_ENGLISH, ANALYZER_KEYWORD).get("weight"), itx); + fail("should failed"); + } catch (BackendException e) { + } + } catch (IOException e) { + fail(e.getMessage()); + } finally { + IOUtils.closeQuietly(fis); + try { + executeRequest(new HttpDelete("_template/template_1")); + } catch (IOException e) { + fail(e.getMessage()); + } + } + } + private void simpleWriteAndQuery(IndexProvider idx) throws BackendException, InterruptedException { final Duration maxWrite = Duration.ofMillis(2000L); final String storeName = "jvmlocal_test_store"; - final KeyInformation.IndexRetriever indexRetriever = IndexProviderTest.getIndexRetriever(IndexProviderTest.getMapping(idx.getFeatures(), "standard", "keyword")); + final KeyInformation.IndexRetriever indexRetriever = IndexProviderTest.getIndexRetriever(IndexProviderTest.getMapping(idx.getFeatures(), ANALYZER_STANDARD, ANALYZER_KEYWORD)); BaseTransactionConfig txConfig = StandardBaseTransactionConfig.of(TimestampProviders.MILLI); IndexTransaction itx = new IndexTransaction(idx, indexRetriever, txConfig, maxWrite); @@ -145,4 +264,16 @@ private void simpleWriteAndQuery(IndexProvider idx) throws BackendException, Int assertEquals(1, itx.query(new IndexQuery(storeName, PredicateCondition.of(IndexProviderTest.NAME, Text.PREFIX, "ali"))).size()); itx.rollback(); } + + private void executeRequest(HttpRequestBase request) throws IOException { + CloseableHttpResponse res = null; + try { + res = httpClient.execute(host, request); + assertTrue(res.getStatusLine().getStatusCode() >= 200); + assertTrue(res.getStatusLine().getStatusCode() < 300); + assertFalse(EntityUtils.toString(res.getEntity()).contains("error")); + } finally { + IOUtils.closeQuietly(res); + } + } } diff --git a/janusgraph-es/src/test/resources/mapping.json b/janusgraph-es/src/test/resources/mapping.json new file mode 100644 index 0000000000..650f98b8f1 --- /dev/null +++ b/janusgraph-es/src/test/resources/mapping.json @@ -0,0 +1,46 @@ +{ + "mappings": { + "test_mapping": { + "properties": { + "boundary": { + "type": "geo_shape", + "tree": "quadtree", + "tree_levels": 20, + "distance_error_pct": 0.025 + }, + "date": { + "type": "date" + }, + "location": { + "type": "geo_point" + }, + "name": { + "type": "string", + "index": "not_analyzed" + }, + "phone_list": { + "type": "string", + "index": "not_analyzed" + }, + "phone_set": { + "type": "string", + "index": "not_analyzed" + }, + "text": { + "type": "string" + }, + "time": { + "type": "long" + } + } + } + }, + "settings": { + "index": { + "number_of_shards": "5", + "max_result_window": "2147483647", + "number_of_replicas": "1" + } + } +} + diff --git a/janusgraph-es/src/test/resources/template.json b/janusgraph-es/src/test/resources/template.json new file mode 100644 index 0000000000..68edeb802f --- /dev/null +++ b/janusgraph-es/src/test/resources/template.json @@ -0,0 +1,47 @@ +{ + "template": "janusgraph*", + "mappings": { + "test_mapping": { + "properties": { + "boundary": { + "type": "geo_shape", + "tree": "quadtree", + "tree_levels": 20, + "distance_error_pct": 0.025 + }, + "date": { + "type": "date" + }, + "location": { + "type": "geo_point" + }, + "name": { + "type": "string", + "index": "not_analyzed" + }, + "phone_list": { + "type": "string", + "index": "not_analyzed" + }, + "phone_set": { + "type": "string", + "index": "not_analyzed" + }, + "text": { + "type": "string" + }, + "time": { + "type": "long" + } + } + } + }, + "settings": { + "index": { + "number_of_shards": "5", + "max_result_window": "2147483647", + "number_of_replicas": "1" + } + } +} +