Skip to content

Commit 6956ef5

Browse files
committed
added query and sort field to create an incremental reindex
1 parent a30f75f commit 6956ef5

File tree

11 files changed

+168
-24
lines changed

11 files changed

+168
-24
lines changed

src/main/java/pl/allegro/tech/search/elasticsearch/tools/reindex/ReindexAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ public static void main(String[] args) {
1212
ReindexInvoker.invokeReindexing(
1313
commandParser.getSourcePointer(),
1414
commandParser.getTargetPointer(),
15-
commandParser.getSegmentation());
15+
commandParser.getSegmentation(),
16+
commandParser.getQuery());
1617
}
1718
}
1819
}

src/main/java/pl/allegro/tech/search/elasticsearch/tools/reindex/ReindexCommandParser.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,18 @@
44
import com.beust.jcommander.ParameterException;
55
import pl.allegro.tech.search.elasticsearch.tools.reindex.command.ReindexCommand;
66
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ElasticDataPointer;
7-
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ParsingElasticsearchAddressException;
8-
import pl.allegro.tech.search.elasticsearch.tools.reindex.query.QuerySegmentationFactory;
97
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ElasticDataPointerBuilder;
8+
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ElasticSearchQuery;
9+
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ParsingElasticsearchAddressException;
1010
import pl.allegro.tech.search.elasticsearch.tools.reindex.query.QuerySegmentation;
11+
import pl.allegro.tech.search.elasticsearch.tools.reindex.query.QuerySegmentationFactory;
1112

1213
public class ReindexCommandParser {
1314

1415
private ElasticDataPointer sourcePointer;
1516
private ElasticDataPointer targetPointer;
1617
private QuerySegmentation segmentation;
18+
private ElasticSearchQuery query;
1719

1820

1921
public boolean tryParse(String... args) {
@@ -45,6 +47,7 @@ private void buildReindexParameters(ReindexCommand command) {
4547
.setAddress(command.getTarget())
4648
.build();
4749
segmentation = getFieldSegmentation(command);
50+
query = new ElasticSearchQuery.ElasticSearchQueryBuilder(command).build();
4851
}
4952

5053
private QuerySegmentation getFieldSegmentation(ReindexCommand command) {
@@ -62,4 +65,8 @@ public ElasticDataPointer getTargetPointer() {
6265
public QuerySegmentation getSegmentation() {
6366
return segmentation;
6467
}
68+
69+
public ElasticSearchQuery getQuery() {
70+
return query;
71+
}
6572
}

src/main/java/pl/allegro/tech/search/elasticsearch/tools/reindex/ReindexInvoker.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import org.slf4j.LoggerFactory;
66
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ElasticDataPointer;
77
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ElasticSearchClientFactory;
8+
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ElasticSearchQuery;
89
import pl.allegro.tech.search.elasticsearch.tools.reindex.process.IndexingComponent;
910
import pl.allegro.tech.search.elasticsearch.tools.reindex.process.IndexingProcessBuilder;
1011
import pl.allegro.tech.search.elasticsearch.tools.reindex.process.ProcessConfiguration;
@@ -31,20 +32,20 @@ public ReindexInvoker(int querySegmentCount) {
3132
}
3233

3334
public static ReindexingSummary invokeReindexing(ElasticDataPointer sourcePointer, ElasticDataPointer targetPointer, QuerySegmentation
34-
segmentation) {
35+
segmentation, ElasticSearchQuery query) {
3536
ReindexInvoker reindexInvoker = new ReindexInvoker(segmentation.getSegmentsCount());
3637
LOGGER.info("Starting");
37-
ReindexingSummary summary = reindexInvoker.run(sourcePointer, targetPointer, segmentation);
38+
ReindexingSummary summary = reindexInvoker.run(sourcePointer, targetPointer, segmentation, query);
3839
LOGGER.info("Ended");
3940
return summary;
4041
}
4142

42-
public ReindexingSummary run(ElasticDataPointer sourcePointer, ElasticDataPointer targetPointer, QuerySegmentation segmentation) {
43+
public ReindexingSummary run(ElasticDataPointer sourcePointer, ElasticDataPointer targetPointer, QuerySegmentation segmentation, ElasticSearchQuery query) {
4344
Client sourceClient = ElasticSearchClientFactory.createClient(sourcePointer);
4445
Client targetClient = ElasticSearchClientFactory.createClient(targetPointer);
4546

4647
if (indexExists(sourceClient, sourcePointer.getIndexName())) {
47-
startQueriesProcesses(sourceClient, sourcePointer, segmentation);
48+
startQueriesProcesses(sourceClient, sourcePointer, segmentation, query);
4849
startUpdatesProcesses(targetClient, targetPointer);
4950
processSynchronizer.waitForProcessesToEnd();
5051
}
@@ -70,7 +71,7 @@ private void startUpdatesProcesses(Client client, ElasticDataPointer targetPoint
7071
);
7172
}
7273

73-
private void startQueriesProcesses(Client client, ElasticDataPointer sourcePointer, QuerySegmentation segmentation) {
74+
private void startQueriesProcesses(Client client, ElasticDataPointer sourcePointer, QuerySegmentation segmentation, ElasticSearchQuery query) {
7475
IntStream.range(0, segmentation.getSegmentsCount())
7576
.mapToObj(
7677
i ->
@@ -79,11 +80,12 @@ private void startQueriesProcesses(Client client, ElasticDataPointer sourcePoint
7980
.setDataPointer(sourcePointer)
8081
.setSegmentationField(segmentation.getFieldName())
8182
.setBound(segmentation.getThreshold(i))
83+
.setQuery(query)
8284
.createQueryComponent()
8385
).map(
8486
queryComponent -> new QueryProcess(processSynchronizer, queryComponent)
8587
).forEach(
86-
queryProcess -> processExecutor.startProcess(queryProcess)
88+
processExecutor::startProcess
8789
);
8890
}
8991

src/main/java/pl/allegro/tech/search/elasticsearch/tools/reindex/command/ReindexCommand.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,15 @@ public class ReindexCommand {
2222
@Parameter(names = { "-segmentationField" }, description = "Segmentation field")
2323
private String segmentationField;
2424

25+
@Parameter(names = { "-query" }, description = "Give a query to start from")
26+
private String query;
27+
28+
@Parameter(names = { "-sort" }, description = "Give field to sort on")
29+
private String sort;
30+
31+
@Parameter(names = { "-sortOrder" }, description = "Give sortOrder")
32+
private String sortOrder;
33+
2534
@Parameter(names = { "-segmentationThresholds" }, description = "Segmentation thresholds (only double type)")
2635
private List<Double> segmentationThresholds;
2736

@@ -38,6 +47,18 @@ public String getSegmentationField() {
3847
return segmentationField;
3948
}
4049

50+
public String getQuery() {
51+
return query;
52+
}
53+
54+
public String getSort() {
55+
return sort;
56+
}
57+
58+
public String getSortOrder() {
59+
return sortOrder;
60+
}
61+
4162
public List<Double> getSegmentationThresholds() {
4263
return segmentationThresholds;
4364
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package pl.allegro.tech.search.elasticsearch.tools.reindex.connection;
2+
3+
import org.elasticsearch.search.sort.FieldSortBuilder;
4+
import org.elasticsearch.search.sort.SortBuilder;
5+
import org.elasticsearch.search.sort.SortOrder;
6+
import pl.allegro.tech.search.elasticsearch.tools.reindex.command.ReindexCommand;
7+
8+
/**
9+
* Used when starting the reindex from a specific point.
10+
*/
11+
public class ElasticSearchQuery {
12+
private final String query;
13+
private final SortBuilder sort;
14+
15+
public ElasticSearchQuery(String query) {
16+
this(query, null);
17+
}
18+
19+
public ElasticSearchQuery(String query, SortBuilder sort) {
20+
this.query = query;
21+
this.sort = sort;
22+
}
23+
24+
public String getQuery() {
25+
return query;
26+
}
27+
28+
public SortBuilder getSort() {
29+
return sort;
30+
}
31+
32+
public static class ElasticSearchQueryBuilder {
33+
private String query;
34+
private String sort;
35+
private SortOrder sortOrder;
36+
public ElasticSearchQueryBuilder(ReindexCommand reindexCommand) {
37+
if (reindexCommand != null) {
38+
this.query = reindexCommand.getQuery();
39+
this.sort = reindexCommand.getSort();
40+
try {
41+
this.sortOrder = SortOrder.valueOf(reindexCommand.getSortOrder());
42+
} catch (IllegalArgumentException | NullPointerException e) {
43+
this.sortOrder = SortOrder.ASC;
44+
}
45+
}
46+
}
47+
48+
public ElasticSearchQuery build() {
49+
return new ElasticSearchQuery(query, new FieldSortBuilder(sort).order(sortOrder));
50+
}
51+
}
52+
}

src/main/java/pl/allegro/tech/search/elasticsearch/tools/reindex/process/QueryComponent.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
package pl.allegro.tech.search.elasticsearch.tools.reindex.process;
22

3-
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ElasticDataPointer;
4-
import pl.allegro.tech.search.elasticsearch.tools.reindex.query.BoundedSegment;
5-
import pl.allegro.tech.search.elasticsearch.tools.reindex.query.filter.BoundedFilterFactory;
63
import org.elasticsearch.action.search.SearchRequestBuilder;
74
import org.elasticsearch.action.search.SearchResponse;
85
import org.elasticsearch.action.search.SearchType;
96
import org.elasticsearch.client.Client;
107
import org.elasticsearch.common.unit.TimeValue;
8+
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ElasticDataPointer;
9+
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ElasticSearchQuery;
10+
import pl.allegro.tech.search.elasticsearch.tools.reindex.query.BoundedSegment;
11+
import pl.allegro.tech.search.elasticsearch.tools.reindex.query.filter.BoundedFilterFactory;
1112

1213
import java.util.Optional;
1314

@@ -21,13 +22,15 @@ public class QueryComponent {
2122
private Optional<String> segmentationField;
2223
private ElasticDataPointer dataPointer;
2324
private Optional<BoundedSegment> bound;
25+
private ElasticSearchQuery query;
2426
private BoundedFilterFactory boundedFilterFactory = new BoundedFilterFactory();
2527

26-
QueryComponent(Client client, ElasticDataPointer dataPointer, Optional<String> segmentationField, Optional<BoundedSegment> bound) {
28+
QueryComponent(Client client, ElasticDataPointer dataPointer, Optional<String> segmentationField, Optional<BoundedSegment> bound, ElasticSearchQuery query) {
2729
this.client = client;
2830
this.dataPointer = dataPointer;
2931
this.segmentationField = segmentationField;
3032
this.bound = bound;
33+
this.query = query;
3134
}
3235

3336
public SearchResponse prepareSearchScrollRequest() {
@@ -38,6 +41,13 @@ public SearchResponse prepareSearchScrollRequest() {
3841
.setScroll(new TimeValue(SCROLL_TIME_LIMIT))
3942
.setSize(SCROLL_SHARD_LIMIT);
4043

44+
if (query != null && query.getQuery() != null && !"".equals(query.getQuery())) {
45+
searchRequestBuilder.setQuery(query.getQuery());
46+
}
47+
if (query != null && query.getSort() != null) {
48+
searchRequestBuilder.addSort(query.getSort());
49+
}
50+
4151
bound.map(resolvedBound -> boundedFilterFactory.createBoundedFilter(segmentationField.get(), resolvedBound))
4252
.ifPresent(searchRequestBuilder::setQuery);
4353

src/main/java/pl/allegro/tech/search/elasticsearch/tools/reindex/process/QueryComponentBuilder.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package pl.allegro.tech.search.elasticsearch.tools.reindex.process;
22

33
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ElasticDataPointer;
4+
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ElasticSearchQuery;
45
import pl.allegro.tech.search.elasticsearch.tools.reindex.query.BoundedSegment;
56
import org.elasticsearch.client.Client;
67

@@ -11,6 +12,7 @@ public final class QueryComponentBuilder {
1112
private ElasticDataPointer dataPointer;
1213
private Optional<String> segmentationField = Optional.empty();
1314
private Optional<BoundedSegment> bound = Optional.empty();
15+
private ElasticSearchQuery query;
1416

1517
private QueryComponentBuilder() {
1618
}
@@ -35,11 +37,18 @@ public QueryComponentBuilder setBound(Optional<BoundedSegment> bound) {
3537
return this;
3638
}
3739

40+
public QueryComponentBuilder setQuery(ElasticSearchQuery query) {
41+
this.query = query;
42+
return this;
43+
}
44+
45+
3846
public static QueryComponentBuilder builder() {
3947
return new QueryComponentBuilder();
4048
}
4149

4250
public QueryComponent createQueryComponent() {
43-
return new QueryComponent(client, dataPointer, segmentationField, bound);
51+
return new QueryComponent(client, dataPointer, segmentationField, bound, query);
4452
}
53+
4554
}

src/test/java/pl/allegro/tech/search/elasticsearch/tools/reindex/ReindexInvokerTest.java

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@
22

33
import com.google.common.collect.ImmutableMap;
44
import com.google.common.collect.Lists;
5+
import org.elasticsearch.search.sort.FieldSortBuilder;
56
import org.junit.AfterClass;
67
import org.junit.Before;
78
import org.junit.BeforeClass;
89
import org.junit.Test;
910
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ElasticDataPointer;
11+
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ElasticSearchQuery;
1012
import pl.allegro.tech.search.elasticsearch.tools.reindex.embeded.EmbeddedElasticsearchCluster;
1113
import pl.allegro.tech.search.elasticsearch.tools.reindex.embeded.IndexDocument;
1214
import pl.allegro.tech.search.elasticsearch.tools.reindex.query.DoubleFieldSegmentation;
@@ -52,8 +54,9 @@ public void indexingWithoutSegmentingEmpty() throws Exception {
5254
embeddedElasticsearchCluster.recreateIndex(SOURCE_INDEX);
5355
ElasticDataPointer sourceDataPointer = embeddedElasticsearchCluster.createDataPointer(SOURCE_INDEX);
5456
ElasticDataPointer targetDataPointer = embeddedElasticsearchCluster.createDataPointer(TARGET_INDEX);
57+
ElasticSearchQuery elasticSearchQuery = embeddedElasticsearchCluster.createInitialQuery("");
5558
//when
56-
ReindexInvoker.invokeReindexing(sourceDataPointer, targetDataPointer, EmptySegmentation.createEmptySegmentation());
59+
ReindexInvoker.invokeReindexing(sourceDataPointer, targetDataPointer, EmptySegmentation.createEmptySegmentation(), elasticSearchQuery);
5760
//then
5861
assertFalse(embeddedElasticsearchCluster.indexExist(TARGET_INDEX));
5962
}
@@ -64,9 +67,10 @@ public void indexingWithSegmentingEmpty() throws Exception {
6467
embeddedElasticsearchCluster.recreateIndex(SOURCE_INDEX);
6568
ElasticDataPointer sourceDataPointer = embeddedElasticsearchCluster.createDataPointer(SOURCE_INDEX);
6669
ElasticDataPointer targetDataPointer = embeddedElasticsearchCluster.createDataPointer(TARGET_INDEX);
70+
ElasticSearchQuery elasticSearchQuery = embeddedElasticsearchCluster.createInitialQuery("");
6771
//when
6872
ReindexInvoker.invokeReindexing(sourceDataPointer, targetDataPointer, DoubleFieldSegmentation.create("fieldName",
69-
Lists.newArrayList(1.0, 3.0)));
73+
Lists.newArrayList(1.0, 3.0)), elasticSearchQuery);
7074
//then
7175
assertFalse(embeddedElasticsearchCluster.indexExist(TARGET_INDEX));
7276
}
@@ -77,8 +81,10 @@ public void indexingWithoutSegmenting() throws Exception {
7781
indexWithSampleData();
7882
ElasticDataPointer sourceDataPointer = embeddedElasticsearchCluster.createDataPointer(SOURCE_INDEX);
7983
ElasticDataPointer targetDataPointer = embeddedElasticsearchCluster.createDataPointer(TARGET_INDEX);
84+
ElasticSearchQuery elasticSearchQuery = embeddedElasticsearchCluster.createInitialQuery("");
8085
//when
81-
ReindexingSummary reindexingSummary = ReindexInvoker.invokeReindexing(sourceDataPointer, targetDataPointer, EmptySegmentation.createEmptySegmentation());
86+
ReindexingSummary reindexingSummary = ReindexInvoker.invokeReindexing(sourceDataPointer, targetDataPointer,
87+
EmptySegmentation.createEmptySegmentation(), elasticSearchQuery);
8288
//then
8389
assertEquals(9L, embeddedElasticsearchCluster.count(TARGET_INDEX));
8490
assertThat(reindexingSummary)
@@ -94,9 +100,10 @@ public void indexingWithSegmentingByDoubleField() throws Exception {
94100
indexWithSampleData();
95101
ElasticDataPointer sourceDataPointer = embeddedElasticsearchCluster.createDataPointer(SOURCE_INDEX);
96102
ElasticDataPointer targetDataPointer = embeddedElasticsearchCluster.createDataPointer(TARGET_INDEX);
103+
ElasticSearchQuery elasticSearchQuery = embeddedElasticsearchCluster.createInitialQuery("");
97104
//when
98105
ReindexingSummary reindexingSummary = ReindexInvoker.invokeReindexing(sourceDataPointer, targetDataPointer, DoubleFieldSegmentation.create("fieldName",
99-
Lists.newArrayList(1.0, 3.0, 7.0)));
106+
Lists.newArrayList(1.0, 3.0, 7.0)), elasticSearchQuery);
100107
//then
101108
assertEquals(6L, embeddedElasticsearchCluster.count(TARGET_INDEX));
102109
assertThat(reindexingSummary)
@@ -111,9 +118,10 @@ public void indexingWithSegmentingByPrefixOnStringField() throws Exception {
111118
indexWithSampleData();
112119
ElasticDataPointer sourceDataPointer = embeddedElasticsearchCluster.createDataPointer(SOURCE_INDEX);
113120
ElasticDataPointer targetDataPointer = embeddedElasticsearchCluster.createDataPointer(TARGET_INDEX);
121+
ElasticSearchQuery elasticSearchQuery = embeddedElasticsearchCluster.createInitialQuery("");
114122
//when
115123
ReindexingSummary reindexingSummary = ReindexInvoker.invokeReindexing(sourceDataPointer, targetDataPointer, StringPrefixSegmentation.create("fieldName",
116-
Lists.newArrayList("1", "2", "3", "4")));
124+
Lists.newArrayList("1", "2", "3", "4")), elasticSearchQuery);
117125
//then
118126
assertEquals(4L, embeddedElasticsearchCluster.count(TARGET_INDEX));
119127
assertThat(reindexingSummary)
@@ -122,6 +130,26 @@ public void indexingWithSegmentingByPrefixOnStringField() throws Exception {
122130
.hasFailedIndexedCount(0L);
123131
}
124132

133+
134+
@Test
135+
public void indexingWithStartQuery() throws Exception {
136+
//given
137+
indexWithSampleData();
138+
ElasticDataPointer sourceDataPointer = embeddedElasticsearchCluster.createDataPointer(SOURCE_INDEX);
139+
ElasticDataPointer targetDataPointer = embeddedElasticsearchCluster.createDataPointer(TARGET_INDEX);
140+
ElasticSearchQuery elasticSearchQuery = embeddedElasticsearchCluster.createInitialQuery("" +
141+
"{\"range\": {\"fieldName\" : { \"gte\" : \"5\"}}}", new FieldSortBuilder("fieldName"));
142+
//when
143+
ReindexingSummary reindexingSummary = ReindexInvoker.invokeReindexing(sourceDataPointer, targetDataPointer,
144+
EmptySegmentation.createEmptySegmentation(), elasticSearchQuery);
145+
//then
146+
assertEquals(5L, embeddedElasticsearchCluster.count(TARGET_INDEX));
147+
assertThat(reindexingSummary)
148+
.hasIndexedCount(5L)
149+
.hasQueriedCount(5L)
150+
.hasFailedIndexedCount(0L);
151+
}
152+
125153
private void indexWithSampleData() {
126154
Stream<IndexDocument> streamToBeIndexed = IntStream
127155
.range(1, 10)
@@ -137,8 +165,9 @@ public void tryingReindexNotExistingIndex() throws Exception {
137165
embeddedElasticsearchCluster.deleteIndex(SOURCE_INDEX);
138166
ElasticDataPointer sourceDataPointer = embeddedElasticsearchCluster.createDataPointer(SOURCE_INDEX);
139167
ElasticDataPointer targetDataPointer = embeddedElasticsearchCluster.createDataPointer(TARGET_INDEX);
168+
ElasticSearchQuery elasticSearchQuery = embeddedElasticsearchCluster.createInitialQuery("");
140169
//when
141-
ReindexInvoker.invokeReindexing(sourceDataPointer, targetDataPointer, EmptySegmentation.createEmptySegmentation());
170+
ReindexInvoker.invokeReindexing(sourceDataPointer, targetDataPointer, EmptySegmentation.createEmptySegmentation(), elasticSearchQuery);
142171
//then
143172
assertFalse(embeddedElasticsearchCluster.indexExist(TARGET_INDEX));
144173

0 commit comments

Comments
 (0)