Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public interface ElasticSearchClient extends Closeable {

void createMapping(String indexName, String typeName, XContentBuilder mapping) throws IOException;

Map getMapping(String indexName, String typeName) throws IOException;

void deleteIndex(String indexName) throws IOException;

void bulkRequest(List<ElasticSearchMutation> requests) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public class ElasticSearchIndex implements IndexProvider {
public static final ConfigOption<String> BULK_REFRESH =
new ConfigOption<String>(ELASTICSEARCH_NS, "bulk-refresh",
"Elasticsearch bulk API refresh setting used to control when changes made by this request are made " +
"visible to search", ConfigOption.Type.MASKABLE, "false");
"visible to search", ConfigOption.Type.MASKABLE, "false");

public static final ConfigNamespace ES_EXTRAS_NS =
new ConfigNamespace(ELASTICSEARCH_NS, "ext", "Overrides for arbitrary elasticsearch.yaml settings", true);
Expand All @@ -168,6 +168,10 @@ public class ElasticSearchIndex implements IndexProvider {
public static final ConfigNamespace ES_CREATE_EXTRAS_NS =
new ConfigNamespace(ES_CREATE_NS, "ext", "Overrides for arbitrary settings applied at index creation", true);

public static final ConfigOption<Boolean> USE_EXTERNAL_MAPPINGS =
new ConfigOption<Boolean>(ES_CREATE_NS, "use-external-mappings",
"Whether JanusGraph should make use of an external mapping when registering an index.", ConfigOption.Type.MASKABLE, false);

private static final IndexFeatures ES_FEATURES = new IndexFeatures.Builder()
.setDefaultStringMapping(Mapping.TEXT).supportedStringMappings(Mapping.TEXT, Mapping.TEXTSTRING, Mapping.STRING).setWildcardField("_all").supportsCardinality(Cardinality.SINGLE).supportsCardinality(Cardinality.LIST).supportsCardinality(Cardinality.SET).supportsNanoseconds().supportsCustomAnalyzer().build();

Expand All @@ -189,9 +193,11 @@ public class ElasticSearchIndex implements IndexProvider {
private final String indexName;
private final int maxResultsSize;
private final String scriptLang;
private final boolean useExternalMappings;

public ElasticSearchIndex(Configuration config) throws BackendException {
indexName = config.get(INDEX_NAME);
useExternalMappings = config.get(USE_EXTERNAL_MAPPINGS);

checkExpectedClientVersion();

Expand Down Expand Up @@ -227,8 +233,8 @@ public ElasticSearchIndex(Configuration config) throws BackendException {
private void checkForOrCreateIndex(Configuration config) throws IOException {
Preconditions.checkState(null != client);

//Create index if it does not already exist
if (!client.indexExists(indexName)) {
//Create index if it does not useExternalMappings and if it does not already exist
if (!useExternalMappings && !client.indexExists(indexName)) {

Settings.Builder settings = Settings.builder();

Expand All @@ -243,8 +249,8 @@ private void checkForOrCreateIndex(Configuration config) throws IOException {
} catch (InterruptedException e) {
throw new JanusGraphException("Interrupted while waiting for index to settle in", e);
}
if (!client.indexExists(indexName)) throw new IllegalArgumentException("Could not create index: " + indexName);
}
if (!client.indexExists(indexName)) throw new IllegalArgumentException("Could not create index: " + indexName);
}


Expand Down Expand Up @@ -286,13 +292,37 @@ private static Map<Geo, ShapeRelation> spatialPredicates() {

@Override
public void register(String store, String key, KeyInformation information, BaseTransaction tx) throws BackendException {
XContentBuilder mapping;
Class<?> dataType = information.getDataType();
Mapping map = Mapping.getMapping(information);
Preconditions.checkArgument(map==Mapping.DEFAULT || AttributeUtil.isString(dataType) ||
(map==Mapping.PREFIX_TREE && AttributeUtil.isGeo(dataType)),
"Specified illegal mapping [%s] for data type [%s]",map,dataType);

if (useExternalMappings) {
try {
//We check if the externalMapping have the property 'key'
Map mappings = client.getMapping(indexName, store);
if (!mappings.containsKey(key)) {
throw new PermanentBackendException("The external mapping for index '"+indexName+"' and type '"+store+"' do not have property '"+key+"'");
}
} catch (IOException e) {
throw new PermanentBackendException(e);
}
} else {
this.pushMapping(store, key, information);
}
}

/**
* Push mapping to ElasticSearch
* @param store the type in the index
* @param key the name of the property in the index
* @param information information of the key
*/
private void pushMapping(String store, String key, KeyInformation information) throws AssertionError, PermanentBackendException, BackendException {
Class<?> dataType = information.getDataType();
Mapping map = Mapping.getMapping(information);
XContentBuilder mapping;
try {
mapping = XContentFactory.jsonBuilder().
startObject().
Expand All @@ -317,7 +347,7 @@ public void register(String store, String key, KeyInformation information, BaseT
if (textAnalyzer != null) {
mapping.field(ANALYZER, textAnalyzer);
}
break;
break;
case TEXTSTRING:
if (textAnalyzer != null) {
mapping.field(ANALYZER, textAnalyzer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
Expand Down Expand Up @@ -94,7 +96,13 @@ public Map getIndexSettings(String indexName) throws IOException {

@Override
public void createMapping(String indexName, String typeName, XContentBuilder mapping) throws IOException {
client.admin().indices().preparePutMapping(indexName).setType(typeName).setSource(mapping).execute().actionGet();
client.admin().indices().preparePutMapping(indexName).setType(typeName).setSource(mapping).execute().actionGet();
}

@Override
public Map getMapping(String indexName, String typeName) throws IOException {
GetMappingsResponse response = client.admin().indices().getMappings(new GetMappingsRequest().indices(indexName.toLowerCase()).types(typeName)).actionGet();
return (Map)response.getMappings().get(indexName.toLowerCase()).get(typeName).getSourceAsMap().get("properties");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,15 @@ public void createMapping(String indexName, String typeName, XContentBuilder map
performRequest("PUT", "/" + indexName + "/_mapping/" + typeName, bytes);
}

@Override
public Map getMapping(String indexName, String typeName) throws IOException{
Response response = performRequest("GET", "/" + indexName.toLowerCase() + "/_mapping/"+typeName, null);
try (final InputStream inputStream = response.getEntity().getContent()) {
Map<String, RestIndexMappings> settings = mapper.readValue(inputStream, new TypeReference<Map<String, RestIndexMappings>>() {});
return settings.get(indexName).getMappings().get(typeName).getProperties();
}
}

@Override
public void deleteIndex(String indexName) throws IOException {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2017 JanusGraph Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.janusgraph.diskstorage.es.rest;

import java.util.Map;

/**
* Serialization of Elasticsearch index mapping.
*
* @author David Clement (davidclement90@laposte.net)
*/
public class RestIndexMappings {

private Map<String, RestIndexMapping> mappings;

public Map<String, RestIndexMapping> getMappings() {
return mappings;
}

public void setMappings(Map<String, RestIndexMapping> mappings){
this.mappings = mappings;
}

public static class RestIndexMapping {

private Map<String, Object> properties;

public Map<String, Object> getProperties() {
return properties;
}

public void setProperties(Map<String, Object> properties) {
this.properties = properties;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,29 @@
import org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration;
import org.janusgraph.graphdb.query.condition.PredicateCondition;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpHost;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import com.google.common.base.Joiner;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.time.Duration;

import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_HOSTS;
Expand All @@ -50,20 +67,37 @@ public class ElasticSearchConfigTest {

private static final String INDEX_NAME = "escfg";

private static final String ANALYZER_KEYWORD = "keyword";

private static final String ANALYZER_ENGLISH = "english";

private static final String ANALYZER_STANDARD = "standard";

private ElasticsearchRunner esr;

private int port;

private HttpHost host;

private CloseableHttpClient httpClient;

@Before
public void setup() throws Exception {
esr = new ElasticsearchRunner();
esr.start();
port = getInterface() == ElasticSearchSetup.REST_CLIENT ? 9200 : 9300;
httpClient = HttpClients.createDefault();
try {
host = new HttpHost(InetAddress.getByName("127.0.0.1"), 9200);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you get the port on line 88 but ignore the value and use 9200 across the board. is this ok?

Copy link
Contributor Author

@davidclement90 davidclement90 Jun 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it's ok. Even if JanusGraph talks to Elasticsearch with the tranport API over port 9300, the mapping or the template are pushed with the REST API over port 9200.

} catch (UnknownHostException e) {
fail(e.getMessage());
}
}

@After
public void teardown() throws Exception {
esr.stop();
IOUtils.closeQuietly(httpClient);
}

public ElasticSearchSetup getInterface() {
Expand Down Expand Up @@ -128,11 +162,96 @@ public void testIndexCreationOptions() throws InterruptedException, BackendExcep
client.close();
}

@Test
public void testExternalMappingsViaMapping() throws BackendException {
final Duration maxWrite = Duration.ofMillis(2000L);
final String storeName = "test_mapping";
CommonsConfiguration cc = new CommonsConfiguration(new BaseConfiguration());
ModifiableConfiguration config = new ModifiableConfiguration(GraphDatabaseConfiguration.ROOT_NS, cc,
BasicConfiguration.Restriction.NONE);
config.set(USE_EXTERNAL_MAPPINGS, true, INDEX_NAME);
Configuration indexConfig = config.restrictTo(INDEX_NAME);
FileInputStream fis = null;
try {
//Test create index KO mapping is not push
try {
new ElasticSearchIndex(indexConfig);
fail("should failed");
} catch (IllegalArgumentException e) {
}
HttpPut newMapping = new HttpPut("janusgraph");
fis = new FileInputStream(new File("src/test/resources/mapping.json"));
newMapping.setEntity(new StringEntity(Joiner.on("").join(IOUtils.readLines(fis)), Charset.forName("UTF-8")));
executeRequest(newMapping);

IndexProvider idx = new ElasticSearchIndex(indexConfig);
final KeyInformation.IndexRetriever indexRetriever = IndexProviderTest
.getIndexRetriever(IndexProviderTest.getMapping(idx.getFeatures(), ANALYZER_ENGLISH, ANALYZER_KEYWORD));
BaseTransactionConfig txConfig = StandardBaseTransactionConfig.of(TimestampProviders.MILLI);
IndexTransaction itx = new IndexTransaction(idx, indexRetriever, txConfig, maxWrite);

// Test date property OK
idx.register(storeName, "date", IndexProviderTest.getMapping(idx.getFeatures(), ANALYZER_ENGLISH, ANALYZER_KEYWORD).get("date"), itx);
// Test weight property KO
try {
idx.register(storeName, "weight", IndexProviderTest.getMapping(idx.getFeatures(), ANALYZER_ENGLISH, ANALYZER_KEYWORD).get("weight"), itx);
fail("should failed");
} catch (BackendException e) {
}
} catch (IOException e) {
fail(e.getMessage());
} finally {
IOUtils.closeQuietly(fis);
}
}

@Test
public void testExternalMappingsViaTemplate() throws BackendException {
FileInputStream fis = null;
try {
HttpPut newTemplate = new HttpPut("_template/template_1");
fis = new FileInputStream(new File("src/test/resources/template.json"));
newTemplate.setEntity(new StringEntity(Joiner.on("").join(IOUtils.readLines(fis)), Charset.forName("UTF-8")));
executeRequest(newTemplate);
HttpPut newMapping = new HttpPut("janusgraph");
executeRequest(newMapping);
final Duration maxWrite = Duration.ofMillis(2000L);
final String storeName = "test_mapping";
CommonsConfiguration cc = new CommonsConfiguration(new BaseConfiguration());
ModifiableConfiguration config = new ModifiableConfiguration(GraphDatabaseConfiguration.ROOT_NS, cc,
BasicConfiguration.Restriction.NONE);
config.set(USE_EXTERNAL_MAPPINGS, true, INDEX_NAME);
Configuration indexConfig = config.restrictTo(INDEX_NAME);
IndexProvider idx = new ElasticSearchIndex(indexConfig);
final KeyInformation.IndexRetriever indexRetriever = IndexProviderTest
.getIndexRetriever(IndexProviderTest.getMapping(idx.getFeatures(), ANALYZER_ENGLISH, ANALYZER_KEYWORD));
BaseTransactionConfig txConfig = StandardBaseTransactionConfig.of(TimestampProviders.MILLI);
IndexTransaction itx = new IndexTransaction(idx, indexRetriever, txConfig, maxWrite);
// Test date property OK
idx.register(storeName, "date", IndexProviderTest.getMapping(idx.getFeatures(), ANALYZER_ENGLISH, ANALYZER_KEYWORD).get("date"), itx);
// Test weight property KO
try {
idx.register(storeName, "weight", IndexProviderTest.getMapping(idx.getFeatures(), ANALYZER_ENGLISH, ANALYZER_KEYWORD).get("weight"), itx);
fail("should failed");
} catch (BackendException e) {
}
} catch (IOException e) {
fail(e.getMessage());
} finally {
IOUtils.closeQuietly(fis);
try {
executeRequest(new HttpDelete("_template/template_1"));
} catch (IOException e) {
fail(e.getMessage());
}
}
}

private void simpleWriteAndQuery(IndexProvider idx) throws BackendException, InterruptedException {

final Duration maxWrite = Duration.ofMillis(2000L);
final String storeName = "jvmlocal_test_store";
final KeyInformation.IndexRetriever indexRetriever = IndexProviderTest.getIndexRetriever(IndexProviderTest.getMapping(idx.getFeatures(), "standard", "keyword"));
final KeyInformation.IndexRetriever indexRetriever = IndexProviderTest.getIndexRetriever(IndexProviderTest.getMapping(idx.getFeatures(), ANALYZER_STANDARD, ANALYZER_KEYWORD));

BaseTransactionConfig txConfig = StandardBaseTransactionConfig.of(TimestampProviders.MILLI);
IndexTransaction itx = new IndexTransaction(idx, indexRetriever, txConfig, maxWrite);
Expand All @@ -145,4 +264,16 @@ private void simpleWriteAndQuery(IndexProvider idx) throws BackendException, Int
assertEquals(1, itx.query(new IndexQuery(storeName, PredicateCondition.of(IndexProviderTest.NAME, Text.PREFIX, "ali"))).size());
itx.rollback();
}

private void executeRequest(HttpRequestBase request) throws IOException {
CloseableHttpResponse res = null;
try {
res = httpClient.execute(host, request);
assertTrue(res.getStatusLine().getStatusCode() >= 200);
assertTrue(res.getStatusLine().getStatusCode() < 300);
assertFalse(EntityUtils.toString(res.getEntity()).contains("error"));
} finally {
IOUtils.closeQuietly(res);
}
}
}
Loading