Skip to content

Commit 98ecc0d

Browse files
committed
Merge branch 'rharing-adding_sortField_in_preparation_of_incremental_reindex'
2 parents 080346e + af9af5c commit 98ecc0d

22 files changed

+413
-41
lines changed

README.md

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,13 @@ With segmentation by prefix on string field:
6767
cluster_name1 -segmentationField userId -segmentationPrefixes 1,2,3,4,5,6,7`
6868

6969
In this example index querying will divide data into segments based on the first character of the userId field: 1,2,3,4,5,6,7
70+
71+
With query option:
72+
73+
`./run.sh -s http://host:9300/index/type -t http://host1:9300/index1/type1 -sc cluster_name -tc
74+
cluster_name1 "\"{range\": {\"_timestamp\" : {\"gte\" : 1447142880000}}}\"" -sort _timestamp -sortOrder DESC`
75+
76+
In this example index querying will be filtered with query and reindex will take place ordered by sort field and sortOrder
7077

7178
Options:
7279

@@ -84,7 +91,13 @@ Options:
8491
Segmentation prefixes (comma-separated)
8592
-segmentationThresholds
8693
Segmentation thresholds (only double type)
87-
94+
-query
95+
Give a query to filter data
96+
-sort
97+
Give field to sort on (if query option in use)
98+
-sortOrder
99+
Give sortOrder (if query option in use)
100+
88101
`segmentationField`, `segmentationThreshold` and `segmentationPrefixes` are optional parameters, allowing to spread
89102
querying for field with double values or prefix for string field
90103

src/main/java/pl/allegro/tech/search/elasticsearch/tools/reindex/ReindexCommandParser.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,19 @@
44
import com.beust.jcommander.ParameterException;
55
import pl.allegro.tech.search.elasticsearch.tools.reindex.command.ReindexCommand;
66
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ElasticDataPointer;
7-
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ParsingElasticsearchAddressException;
8-
import pl.allegro.tech.search.elasticsearch.tools.reindex.query.QuerySegmentationFactory;
97
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ElasticDataPointerBuilder;
8+
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ElasticSearchQuery;
9+
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ElasticSearchQueryBuilder;
10+
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ParsingElasticsearchAddressException;
1011
import pl.allegro.tech.search.elasticsearch.tools.reindex.query.QuerySegmentation;
12+
import pl.allegro.tech.search.elasticsearch.tools.reindex.query.QuerySegmentationFactory;
1113

1214
public class ReindexCommandParser {
1315

1416
private ElasticDataPointer sourcePointer;
1517
private ElasticDataPointer targetPointer;
1618
private QuerySegmentation segmentation;
1719

18-
1920
public boolean tryParse(String... args) {
2021
ReindexCommand command = new ReindexCommand();
2122
JCommander jCommander = new JCommander(command);
@@ -62,4 +63,5 @@ public ElasticDataPointer getTargetPointer() {
6263
public QuerySegmentation getSegmentation() {
6364
return segmentation;
6465
}
66+
6567
}

src/main/java/pl/allegro/tech/search/elasticsearch/tools/reindex/ReindexInvoker.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import org.slf4j.LoggerFactory;
66
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ElasticDataPointer;
77
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ElasticSearchClientFactory;
8+
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ElasticSearchQuery;
89
import pl.allegro.tech.search.elasticsearch.tools.reindex.process.IndexingComponent;
910
import pl.allegro.tech.search.elasticsearch.tools.reindex.process.IndexingProcessBuilder;
1011
import pl.allegro.tech.search.elasticsearch.tools.reindex.process.ProcessConfiguration;
@@ -79,11 +80,12 @@ private void startQueriesProcesses(Client client, ElasticDataPointer sourcePoint
7980
.setDataPointer(sourcePointer)
8081
.setSegmentationField(segmentation.getFieldName())
8182
.setBound(segmentation.getThreshold(i))
83+
.setQuery(segmentation.getQuery())
8284
.createQueryComponent()
8385
).map(
8486
queryComponent -> new QueryProcess(processSynchronizer, queryComponent)
8587
).forEach(
86-
queryProcess -> processExecutor.startProcess(queryProcess)
88+
processExecutor::startProcess
8789
);
8890
}
8991

src/main/java/pl/allegro/tech/search/elasticsearch/tools/reindex/command/ReindexCommand.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,15 @@ public class ReindexCommand {
2222
@Parameter(names = { "-segmentationField" }, description = "Segmentation field")
2323
private String segmentationField;
2424

25+
@Parameter(names = { "-query" }, description = "Give a query to filter data")
26+
private String query;
27+
28+
@Parameter(names = { "-sort" }, description = "Give field to sort on (if query option in use)")
29+
private String sort;
30+
31+
@Parameter(names = { "-sortOrder" }, description = "Give sortOrder (if query option in use)")
32+
private String sortOrder;
33+
2534
@Parameter(names = { "-segmentationThresholds" }, description = "Segmentation thresholds (only double type)")
2635
private List<Double> segmentationThresholds;
2736

@@ -38,6 +47,18 @@ public String getSegmentationField() {
3847
return segmentationField;
3948
}
4049

50+
public String getQuery() {
51+
return query;
52+
}
53+
54+
public String getSort() {
55+
return sort;
56+
}
57+
58+
public String getSortOrder() {
59+
return sortOrder;
60+
}
61+
4162
public List<Double> getSegmentationThresholds() {
4263
return segmentationThresholds;
4364
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package pl.allegro.tech.search.elasticsearch.tools.reindex.connection;
2+
3+
import org.elasticsearch.search.sort.SortOrder;
4+
5+
public class ElasticSearchQuery {
6+
private final String query;
7+
private final String sortField;
8+
private final SortOrder sortOrder;
9+
10+
ElasticSearchQuery(String query, String sortField, SortOrder sortOrder) {
11+
this.query = query;
12+
this.sortField = sortField;
13+
this.sortOrder = sortOrder;
14+
}
15+
16+
public String getQuery() {
17+
return query;
18+
}
19+
20+
public SortOrder getSortOrder() {
21+
return sortOrder;
22+
}
23+
24+
public String getSortField() {
25+
return sortField;
26+
}
27+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package pl.allegro.tech.search.elasticsearch.tools.reindex.connection;
2+
3+
import com.google.common.base.Strings;
4+
import org.elasticsearch.search.sort.SortOrder;
5+
6+
public class ElasticSearchQueryBuilder {
7+
private String query;
8+
private String sortField;
9+
private SortOrder sortOrder = SortOrder.ASC;
10+
11+
12+
private ElasticSearchQueryBuilder() {
13+
}
14+
15+
public ElasticSearchQueryBuilder setSortOrder(String sortOrder) {
16+
if (!Strings.isNullOrEmpty(sortOrder)) {
17+
try {
18+
this.sortOrder = SortOrder.valueOf(sortOrder);
19+
} catch (IllegalArgumentException e) {
20+
throw new ParsingElasticsearchAddressException("SortOrder can be only ASC or DESC, not " + sortOrder);
21+
}
22+
}
23+
return this;
24+
}
25+
26+
public ElasticSearchQueryBuilder setQuery(String query) {
27+
this.query = query;
28+
return this;
29+
}
30+
31+
public ElasticSearchQueryBuilder setSortByField(String orderByField) {
32+
this.sortField = orderByField;
33+
return this;
34+
}
35+
36+
public ElasticSearchQuery build() {
37+
return new ElasticSearchQuery(query, sortField, sortOrder);
38+
}
39+
40+
public static ElasticSearchQueryBuilder builder() {
41+
return new ElasticSearchQueryBuilder();
42+
}
43+
44+
45+
}

src/main/java/pl/allegro/tech/search/elasticsearch/tools/reindex/process/QueryComponent.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
package pl.allegro.tech.search.elasticsearch.tools.reindex.process;
22

3-
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ElasticDataPointer;
4-
import pl.allegro.tech.search.elasticsearch.tools.reindex.query.BoundedSegment;
5-
import pl.allegro.tech.search.elasticsearch.tools.reindex.query.filter.BoundedFilterFactory;
3+
import com.google.common.base.Strings;
64
import org.elasticsearch.action.search.SearchRequestBuilder;
75
import org.elasticsearch.action.search.SearchResponse;
86
import org.elasticsearch.action.search.SearchType;
97
import org.elasticsearch.client.Client;
108
import org.elasticsearch.common.unit.TimeValue;
9+
import org.elasticsearch.search.sort.FieldSortBuilder;
10+
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ElasticDataPointer;
11+
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ElasticSearchQuery;
12+
import pl.allegro.tech.search.elasticsearch.tools.reindex.query.BoundedSegment;
13+
import pl.allegro.tech.search.elasticsearch.tools.reindex.query.filter.BoundedFilterFactory;
1114

1215
import java.util.Optional;
1316

@@ -21,13 +24,15 @@ public class QueryComponent {
2124
private Optional<String> segmentationField;
2225
private ElasticDataPointer dataPointer;
2326
private Optional<BoundedSegment> bound;
27+
private ElasticSearchQuery query;
2428
private BoundedFilterFactory boundedFilterFactory = new BoundedFilterFactory();
2529

26-
QueryComponent(Client client, ElasticDataPointer dataPointer, Optional<String> segmentationField, Optional<BoundedSegment> bound) {
30+
QueryComponent(Client client, ElasticDataPointer dataPointer, Optional<String> segmentationField, Optional<BoundedSegment> bound, ElasticSearchQuery query) {
2731
this.client = client;
2832
this.dataPointer = dataPointer;
2933
this.segmentationField = segmentationField;
3034
this.bound = bound;
35+
this.query = query;
3136
}
3237

3338
public SearchResponse prepareSearchScrollRequest() {
@@ -38,6 +43,13 @@ public SearchResponse prepareSearchScrollRequest() {
3843
.setScroll(new TimeValue(SCROLL_TIME_LIMIT))
3944
.setSize(SCROLL_SHARD_LIMIT);
4045

46+
if (!Strings.isNullOrEmpty(query.getQuery())) {
47+
searchRequestBuilder.setQuery(query.getQuery());
48+
}
49+
if (!Strings.isNullOrEmpty(query.getSortField())) {
50+
searchRequestBuilder.addSort(new FieldSortBuilder(query.getSortField()).order(query.getSortOrder()));
51+
}
52+
4153
bound.map(resolvedBound -> boundedFilterFactory.createBoundedFilter(segmentationField.get(), resolvedBound))
4254
.ifPresent(searchRequestBuilder::setQuery);
4355

src/main/java/pl/allegro/tech/search/elasticsearch/tools/reindex/process/QueryComponentBuilder.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package pl.allegro.tech.search.elasticsearch.tools.reindex.process;
22

33
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ElasticDataPointer;
4+
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ElasticSearchQuery;
45
import pl.allegro.tech.search.elasticsearch.tools.reindex.query.BoundedSegment;
56
import org.elasticsearch.client.Client;
67

@@ -11,6 +12,7 @@ public final class QueryComponentBuilder {
1112
private ElasticDataPointer dataPointer;
1213
private Optional<String> segmentationField = Optional.empty();
1314
private Optional<BoundedSegment> bound = Optional.empty();
15+
private ElasticSearchQuery query;
1416

1517
private QueryComponentBuilder() {
1618
}
@@ -35,11 +37,18 @@ public QueryComponentBuilder setBound(Optional<BoundedSegment> bound) {
3537
return this;
3638
}
3739

40+
public QueryComponentBuilder setQuery(ElasticSearchQuery query) {
41+
this.query = query;
42+
return this;
43+
}
44+
45+
3846
public static QueryComponentBuilder builder() {
3947
return new QueryComponentBuilder();
4048
}
4149

4250
public QueryComponent createQueryComponent() {
43-
return new QueryComponent(client, dataPointer, segmentationField, bound);
51+
return new QueryComponent(client, dataPointer, segmentationField, bound, query);
4452
}
53+
4554
}

src/main/java/pl/allegro/tech/search/elasticsearch/tools/reindex/query/DoubleFieldSegmentation.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,19 @@
11
package pl.allegro.tech.search.elasticsearch.tools.reindex.query;
22

3+
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ElasticSearchQuery;
4+
35
import java.util.Collections;
46
import java.util.List;
57
import java.util.Optional;
68

7-
public final class DoubleFieldSegmentation implements QuerySegmentation {
9+
public final class DoubleFieldSegmentation extends SegmentationQueryTrait implements QuerySegmentation {
810

911
private final List<Double> thresholds;
1012

1113
private final Optional<String> fieldName;
1214

13-
private DoubleFieldSegmentation(String fieldName, List<Double> thresholds) {
15+
private DoubleFieldSegmentation(String fieldName, List<Double> thresholds, ElasticSearchQuery query) {
16+
super(query);
1417
this.fieldName = Optional.of(fieldName);
1518
this.thresholds = Collections.unmodifiableList(thresholds);
1619
}
@@ -35,7 +38,7 @@ public Optional<BoundedSegment> getThreshold(int i) {
3538
return Optional.of(segmentation);
3639
}
3740

38-
public static DoubleFieldSegmentation create(String fieldName, List<Double> thresholds) {
39-
return new DoubleFieldSegmentation(fieldName, thresholds);
41+
public static DoubleFieldSegmentation create(String fieldName, List<Double> thresholds, ElasticSearchQuery query) {
42+
return new DoubleFieldSegmentation(fieldName, thresholds, query);
4043
}
4144
}

src/main/java/pl/allegro/tech/search/elasticsearch/tools/reindex/query/EmptySegmentation.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
package pl.allegro.tech.search.elasticsearch.tools.reindex.query;
22

3-
import java.util.Optional;
3+
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ElasticSearchQuery;
44

5-
public final class EmptySegmentation implements QuerySegmentation {
5+
import java.util.Optional;
66

7-
public static final EmptySegmentation EMPTY_SEGMENTATION_INSTANCE = new EmptySegmentation();
7+
public final class EmptySegmentation extends SegmentationQueryTrait implements QuerySegmentation {
88

9-
private EmptySegmentation() {
9+
private EmptySegmentation(ElasticSearchQuery query) {
10+
super(query);
1011
}
1112

1213
@Override
@@ -24,8 +25,8 @@ public Optional<BoundedSegment> getThreshold(int i) {
2425
return Optional.empty();
2526
}
2627

27-
public static EmptySegmentation createEmptySegmentation() {
28-
return EMPTY_SEGMENTATION_INSTANCE;
28+
public static EmptySegmentation createEmptySegmentation(ElasticSearchQuery query) {
29+
return new EmptySegmentation(query);
2930
}
3031

3132
}

0 commit comments

Comments
 (0)