Skip to content

Commit 9065584

Browse files
committed
Merge pull request #3 from allegro/reindex_ttl_field
#2 | reindex _ttl field
2 parents 584e388 + 4e7d358 commit 9065584

File tree

10 files changed

+165
-25
lines changed

10 files changed

+165
-25
lines changed

.travis.yml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
language: java
2+
3+
script:
4+
- "./gradlew clean build"
5+
6+
jdk:
7+
- oraclejdk8
8+
9+
addons:
10+
apt:
11+
packages:
12+
- oracle-java8-installer

build.gradle

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,4 +97,8 @@ modifyPom {
9797
}
9898
}
9999
}
100+
}
101+
102+
task wrapper(type: Wrapper) {
103+
gradleVersion = '2.6'
100104
}

gradle/wrapper/gradle-wrapper.jar

2.56 KB
Binary file not shown.
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
#Fri Dec 05 22:03:44 CET 2014
1+
#Fri Sep 04 11:11:50 CEST 2015
22
distributionBase=GRADLE_USER_HOME
33
distributionPath=wrapper/dists
44
zipStoreBase=GRADLE_USER_HOME
55
zipStorePath=wrapper/dists
6-
distributionUrl=https\://services.gradle.org/distributions/gradle-2.3-all.zip
6+
distributionUrl=https\://services.gradle.org/distributions/gradle-2.6-bin.zip

src/main/java/pl/allegro/tech/search/elasticsearch/tools/reindex/process/IndexingComponent.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,16 @@ private BulkRequestBuilder createBulkRequestBuilder() {
2626
return client.prepareBulk();
2727
}
2828

29-
public Optional<BulkResult> indexData(ElasticDataPointer dataPointer, SearchHit[] hits) {
29+
public Optional<BulkResult> indexData(ElasticDataPointer targetDataPointer, SearchHit[] hits) {
3030
BulkRequestBuilder bulkRequest = createBulkRequestBuilder();
3131

3232
for (SearchHit hit : hits) {
3333
Map<String, Object> source = hit.getSource();
34-
IndexRequestBuilder requestBuilder = prepareIndex(dataPointer.getIndexName(), dataPointer
34+
IndexRequestBuilder requestBuilder = prepareIndex(targetDataPointer.getIndexName(), targetDataPointer
3535
.getTypeName(), hit.getId());
36+
if (hit.getFields().get("_ttl") != null) {
37+
requestBuilder.setTTL(hit.getFields().get("_ttl").value());
38+
}
3639
requestBuilder.setSource(source);
3740
bulkRequest.add(requestBuilder);
3841
}

src/main/java/pl/allegro/tech/search/elasticsearch/tools/reindex/process/QueryComponent.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ public class QueryComponent {
1515

1616
public static final int SCROLL_TIME_LIMIT = 60000;
1717
public static final int SCROLL_SHARD_LIMIT = 200;
18-
public static final int SCROOL_TIMEOUT = 600000;
18+
public static final int SCROLL_TIMEOUT = 600000;
1919

2020
private Client client;
2121
private Optional<String> segmentationField;
@@ -34,6 +34,7 @@ public SearchResponse prepareSearchScrollRequest() {
3434
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(dataPointer.getIndexName())
3535
.setTypes(dataPointer.getTypeName())
3636
.setSearchType(SearchType.SCAN)
37+
.addFields("_ttl", "_source")
3738
.setScroll(new TimeValue(SCROLL_TIME_LIMIT))
3839
.setSize(SCROLL_SHARD_LIMIT);
3940

@@ -45,7 +46,7 @@ public SearchResponse prepareSearchScrollRequest() {
4546

4647
public SearchResponse getNextScrolledSearchResults(String scrollId) {
4748
return client.prepareSearchScroll(scrollId)
48-
.setScroll(new TimeValue(SCROOL_TIMEOUT))
49+
.setScroll(new TimeValue(SCROLL_TIMEOUT))
4950
.get();
5051
}
5152

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package pl.allegro.tech.search.elasticsearch.tools.reindex;
2+
3+
import com.google.common.collect.ImmutableMap;
4+
import org.elasticsearch.action.search.SearchResponse;
5+
import org.elasticsearch.common.xcontent.XContentBuilder;
6+
import org.elasticsearch.common.xcontent.XContentFactory;
7+
import org.elasticsearch.search.SearchHitField;
8+
import org.junit.AfterClass;
9+
import org.junit.Before;
10+
import org.junit.BeforeClass;
11+
import org.junit.Test;
12+
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ElasticDataPointer;
13+
import pl.allegro.tech.search.elasticsearch.tools.reindex.embeded.EmbeddedElasticsearchCluster;
14+
import pl.allegro.tech.search.elasticsearch.tools.reindex.embeded.IndexDocument;
15+
import pl.allegro.tech.search.elasticsearch.tools.reindex.query.EmptySegmentation;
16+
17+
import java.io.IOException;
18+
import java.util.Map;
19+
import java.util.concurrent.ExecutionException;
20+
import java.util.stream.IntStream;
21+
import java.util.stream.Stream;
22+
23+
import static org.assertj.core.api.Assertions.*;
24+
25+
public class TTLTest {
26+
27+
private static final String SOURCE_INDEX = "sourceindex";
28+
private static final String TARGET_INDEX = "targetindex";
29+
private static final String DATA_TYPE = "type";
30+
31+
32+
private static EmbeddedElasticsearchCluster embeddedElasticsearchCluster;
33+
34+
@BeforeClass
35+
public static void setUp() throws Exception {
36+
embeddedElasticsearchCluster = EmbeddedElasticsearchCluster.createDataNode();
37+
}
38+
39+
@AfterClass
40+
public static void tearDown() throws Exception {
41+
embeddedElasticsearchCluster.close();
42+
}
43+
44+
@Before
45+
public void clearTargetIndex() {
46+
embeddedElasticsearchCluster.deleteIndex(SOURCE_INDEX);
47+
embeddedElasticsearchCluster.deleteIndex(TARGET_INDEX);
48+
}
49+
50+
@Test
51+
public void shouldReindexTTL() throws ExecutionException, InterruptedException {
52+
//given
53+
embeddedElasticsearchCluster.createIndex(SOURCE_INDEX, DATA_TYPE, mappingWithTTL());
54+
embeddedElasticsearchCluster.createIndex(TARGET_INDEX, DATA_TYPE, mappingWithTTL());
55+
indexSampleDataWithTTL();
56+
ElasticDataPointer sourceDataPointer = embeddedElasticsearchCluster.createDataPointer(SOURCE_INDEX);
57+
ElasticDataPointer targetDataPointer = embeddedElasticsearchCluster.createDataPointer(TARGET_INDEX);
58+
59+
//when
60+
ReindexInvoker.invokeReindexing(sourceDataPointer, targetDataPointer, EmptySegmentation.createEmptySegmentation());
61+
SearchResponse targetResponse = embeddedElasticsearchCluster.client().prepareSearch(TARGET_INDEX).addFields("_ttl").get();
62+
63+
//then
64+
assertThat(embeddedElasticsearchCluster.count(SOURCE_INDEX)).isEqualTo(1L);
65+
assertThat(embeddedElasticsearchCluster.count(TARGET_INDEX)).isEqualTo(1L);
66+
67+
Map<String, SearchHitField> resultFields = targetResponse.getHits().getAt(0).getFields();
68+
assertThat(resultFields.containsKey("_ttl"));
69+
assertThat((Long) resultFields.get("_ttl").value() > 0L);
70+
}
71+
72+
private void indexSampleDataWithTTL() {
73+
Stream<IndexDocument> streamToBeIndexed = IntStream
74+
.range(1, 2)
75+
.mapToObj(
76+
i -> {
77+
Long ttl = 60000L;
78+
return new IndexDocument(Integer.toString(i), ImmutableMap.of("fieldName", i), ttl);
79+
}
80+
);
81+
82+
streamToBeIndexed.forEach(
83+
indexDocument -> embeddedElasticsearchCluster.indexDocument(SOURCE_INDEX, DATA_TYPE, indexDocument)
84+
);
85+
embeddedElasticsearchCluster.refreshIndex();
86+
}
87+
88+
private XContentBuilder mappingWithTTL() {
89+
try {
90+
// @formatter:off
91+
//How to enable it in intellij see it here: http://stackoverflow.com/questions/3375307/how-to-disable-code-formatting-for-some-part-of-the-code-using-comments
92+
return XContentFactory.jsonBuilder()
93+
.startObject()
94+
.startObject("_ttl").field("enabled", true).endObject()
95+
.endObject();
96+
// @formatter:off
97+
} catch (IOException e) {
98+
throw new RuntimeException("Failed building index mappingDef", e);
99+
}
100+
}
101+
102+
103+
}

src/test/java/pl/allegro/tech/search/elasticsearch/tools/reindex/embeded/EmbeddedElasticsearchCluster.java

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
package pl.allegro.tech.search.elasticsearch.tools.reindex.embeded;
22

3-
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ElasticDataPointer;
4-
import pl.allegro.tech.search.elasticsearch.tools.reindex.ReindexInvokerTest;
5-
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ElasticDataPointerBuilder;
3+
import org.elasticsearch.action.index.IndexRequestBuilder;
4+
import org.elasticsearch.client.Client;
65
import org.elasticsearch.client.IndicesAdminClient;
76
import org.elasticsearch.common.settings.ImmutableSettings;
87
import org.elasticsearch.common.xcontent.XContentBuilder;
98
import org.elasticsearch.node.Node;
109
import org.elasticsearch.node.NodeBuilder;
10+
import pl.allegro.tech.search.elasticsearch.tools.reindex.ReindexInvokerTest;
11+
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ElasticDataPointer;
12+
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ElasticDataPointerBuilder;
1113

12-
import java.util.Map;
1314
import java.util.stream.Stream;
1415

1516
import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
@@ -51,18 +52,25 @@ public void close() {
5152
dataNode.close();
5253
}
5354

55+
public Client client() {
56+
return dataNode.client();
57+
}
58+
5459
public void deleteIndex(String targetIndex) {
5560
IndicesAdminClient indices = dataNode.client().admin().indices();
5661
if (indices.prepareExists(targetIndex).get().isExists()) {
5762
indices.prepareDelete(targetIndex).get();
5863
}
5964
}
6065

61-
public void indexDocument(String index, String type, String id, Map doc) {
62-
dataNode.client().prepareIndex(index, type, id).setSource(doc).get();
66+
public void indexDocument(String index, String type, IndexDocument indexDocument) {
67+
IndexRequestBuilder requestBuilder = dataNode.client().prepareIndex(index, type, indexDocument.getId()).setSource(indexDocument.getDoc());
68+
if (indexDocument.getTTL() != null) {
69+
requestBuilder.setTTL(indexDocument.getTTL());
70+
}
71+
requestBuilder.get();
6372
}
6473

65-
6674
public boolean indexExist(String index) {
6775
return dataNode.client().admin().indices().prepareExists(index).get().isExists();
6876
}
@@ -82,13 +90,12 @@ public ElasticDataPointer createDataPointer(String indexName) {
8290
public void indexWithSampleData(String sourceIndex, String type, Stream<IndexDocument> indexDocumentStream) {
8391
recreateIndex(sourceIndex);
8492
indexDocumentStream.forEach(
85-
indexDocument ->
86-
indexDocument(sourceIndex, type, indexDocument.getId(), indexDocument.getDoc())
93+
indexDocument -> indexDocument(sourceIndex, type, indexDocument)
8794
);
88-
refreshIndex(sourceIndex);
95+
refreshIndex();
8996
}
9097

91-
private void refreshIndex(String index) {
98+
public void refreshIndex() {
9299
dataNode.client().admin().indices().prepareRefresh().get();
93100
}
94101

src/test/java/pl/allegro/tech/search/elasticsearch/tools/reindex/embeded/IndexDocument.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,28 @@
44

55
public class IndexDocument {
66
private final String id;
7-
private final Map<String, ?> doc;
7+
private final Map<String, Object> doc;
8+
private final Long ttl;
89

9-
public IndexDocument(String id, Map<String, ?> doc) {
10+
public IndexDocument(String id, Map<String, Object> doc, Long ttl) {
1011
this.id = id;
1112
this.doc = doc;
13+
this.ttl = ttl != null && ttl > 0 ? ttl : null;
14+
}
15+
16+
public IndexDocument(String id, Map<String, Object> doc) {
17+
this(id, doc, null);
1218
}
1319

1420
public String getId() {
1521
return id;
1622
}
1723

18-
public Map<String, ?> getDoc() {
24+
public Map<String, Object> getDoc() {
1925
return doc;
2026
}
27+
28+
public Long getTTL() {
29+
return ttl;
30+
}
2131
}

src/test/java/pl/allegro/tech/search/elasticsearch/tools/reindex/process/QueryProcessTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public void shouldNotFillQueueWhenQueryResultEmpty() throws Exception {
3434
public void shouldFillQueueWhenQueryResultNotEmptyInOneChunk() throws Exception {
3535
//given
3636
SearchResponse searchResponse = new SearchResponse();
37-
ProcessSynchronizer processSynchronizer = createProcessSynchronizerMock(searchResponse);
37+
ProcessSynchronizer processSynchronizer = createProcessSynchronizerMock();
3838
QueryComponent queryComponent = mock(QueryComponent.class);
3939
when(queryComponent.prepareSearchScrollRequest()).thenReturn(searchResponse);
4040
when(queryComponent.searchResultsNotEmpty(searchResponse)).thenReturn(true);
@@ -50,8 +50,8 @@ public void shouldFillQueueWhenQueryResultNotEmptyInOneChunk() throws Exception
5050
@Test
5151
public void shouldFillQueueWhenQueryResultNotEmptyInTwoChunks() throws Exception {
5252
//given
53-
SearchResponse searchResponse = createaSearchResponseWithScrollId("scrollId");
54-
ProcessSynchronizer processSynchronizer = createProcessSynchronizerMock(searchResponse);
53+
SearchResponse searchResponse = createSearchResponseWithScrollId("scrollId");
54+
ProcessSynchronizer processSynchronizer = createProcessSynchronizerMock();
5555
QueryComponent queryComponent = mock(QueryComponent.class);
5656
when(queryComponent.prepareSearchScrollRequest()).thenReturn(searchResponse);
5757
when(queryComponent.searchResultsNotEmpty(searchResponse)).thenReturn(true);
@@ -66,11 +66,11 @@ public void shouldFillQueueWhenQueryResultNotEmptyInTwoChunks() throws Exception
6666
verify(processSynchronizer).subtractWorkingQueryProcess();
6767
}
6868

69-
private SearchResponse createaSearchResponseWithScrollId(String scrollId) {
69+
private SearchResponse createSearchResponseWithScrollId(String scrollId) {
7070
return new SearchResponse(InternalSearchResponse.empty(), scrollId, 1, 1, 1, new ShardSearchFailure[0]);
7171
}
7272

73-
private ProcessSynchronizer createProcessSynchronizerMock(SearchResponse searchResponse) {
73+
private ProcessSynchronizer createProcessSynchronizerMock() {
7474
ProcessSynchronizer processSynchronizer = mock(ProcessSynchronizer.class);
7575
when(processSynchronizer.tryFillQueueWithSearchHits(any(SearchResponse.class))).thenReturn(true);
7676
return processSynchronizer;

0 commit comments

Comments
 (0)