Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pl.allegro.tech.search.elasticsearch.tools.reindex;

import org.elasticsearch.client.Client;
import org.elasticsearch.common.collect.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ElasticDataPointer;
Expand Down Expand Up @@ -32,7 +33,7 @@ public ReindexInvoker(int querySegmentCount) {
}

public static ReindexingSummary invokeReindexing(ElasticDataPointer sourcePointer, ElasticDataPointer targetPointer, QuerySegmentation
segmentation) {
segmentation) {
ReindexInvoker reindexInvoker = new ReindexInvoker(segmentation.getSegmentsCount());
LOGGER.info("Starting");
ReindexingSummary summary = reindexInvoker.run(sourcePointer, targetPointer, segmentation);
Expand Down Expand Up @@ -62,30 +63,30 @@ private boolean indexExists(Client sourceClient, String indexName) {

private void startUpdatesProcesses(Client client, ElasticDataPointer targetPointer) {
IntStream.range(0, ProcessConfiguration.getInstance().getUpdateThreadsCount()).forEach(
i -> processExecutor.startProcess(
IndexingProcessBuilder.builder()
.setIndexingComponent(new IndexingComponent(client))
.setDataPointer(targetPointer)
.setProcessSynchronizer(processSynchronizer)
.build())
i -> processExecutor.startProcess(
IndexingProcessBuilder.builder()
.setIndexingComponent(new IndexingComponent(client))
.setDataPointer(targetPointer)
.setProcessSynchronizer(processSynchronizer)
.build())
);
}

private void startQueriesProcesses(Client client, ElasticDataPointer sourcePointer, QuerySegmentation segmentation) {
IntStream.range(0, segmentation.getSegmentsCount())
.mapToObj(
i ->
QueryComponentBuilder.builder()
.setClient(client)
.setDataPointer(sourcePointer)
.setSegmentationField(segmentation.getFieldName())
.setBound(segmentation.getThreshold(i))
.setQuery(segmentation.getQuery())
.createQueryComponent()
).map(
queryComponent -> new QueryProcess(processSynchronizer, queryComponent)
.mapToObj(
i ->
QueryComponentBuilder.builder()
.setClient(client)
.setDataPointer(sourcePointer)
.setSegmentationField(segmentation.getFieldName())
.setBound(segmentation.getThreshold(i))
.setQuery(segmentation.getQuery(i))
.createQueryComponent()
).map(
queryComponent -> new QueryProcess(processSynchronizer, queryComponent)
).forEach(
processExecutor::startProcess
processExecutor::startProcess
);
}

Expand All @@ -101,7 +102,7 @@ private void refreshTargetIndex(Client targetClient) {

private void disconnectElasticsearchClients(Client... clients) {
Arrays.asList(clients)
.forEach(Client::close);
.forEach(Client::close);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

import com.beust.jcommander.Parameter;

import java.util.Collections;
import java.util.List;

public class ReindexCommand {

@Parameter(names = { "-s", "source" }, description = "Source f.e. http://localhost:9300/source_index/type",
required = true)
required = true)
private String source;

@Parameter(names = { "-sc", "source-cluster" }, description = "Source cluster name", required = true)
Expand All @@ -16,7 +17,7 @@ public class ReindexCommand {
@Parameter(names = { "-tc", "target-cluster" }, description = "Target cluster name", required = true)
private String targetClusterName;
@Parameter(names = { "-t", "target" }, description = "Target f.e. http://localhost:9300/target_index/type",
required = true)
required = true)
private String target;

@Parameter(names = { "-segmentationField" }, description = "Segmentation field")
Expand All @@ -25,6 +26,9 @@ public class ReindexCommand {
@Parameter(names = { "-query" }, description = "Give a query to filter data")
private String query;

@Parameter(names = { "-shards" }, description = "Select the shards that will accept the query")
private List<Integer> shards = Collections.emptyList();

@Parameter(names = { "-sort" }, description = "Give field to sort on (if query option in use)")
private String sort;

Expand All @@ -37,6 +41,9 @@ public class ReindexCommand {
@Parameter(names = { "-segmentationPrefixes" }, description = "Segmentation prefixes (comma-separated)")
private List<String> segmentationPrefixes;

@Parameter(names = { "-segmentationByShards" }, description = "Segmentation by shards (true/false)")
private boolean segmentationByShards = false;

public String getSourceClusterName() {
return sourceClusterName;
}
Expand All @@ -51,6 +58,10 @@ public String getQuery() {
return query;
}

public List<Integer> getShards() {
return this.shards;
}

public String getSort() {
return sort;
}
Expand All @@ -67,6 +78,8 @@ public List<String> getSegmentationPrefixes() {
return segmentationPrefixes;
}

public boolean getSegmentationByShards() { return segmentationByShards; }

public String getSource() {
return source;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
package pl.allegro.tech.search.elasticsearch.tools.reindex.connection;

import org.apache.commons.collections4.ArrayStack;
import org.elasticsearch.search.sort.SortOrder;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.StreamSupport;

public class ElasticSearchQuery {
private final String query;
private final String sortField;
private final SortOrder sortOrder;
private final List<Integer> shards;

ElasticSearchQuery(String query, String sortField, SortOrder sortOrder) {
ElasticSearchQuery(String query, String sortField, SortOrder sortOrder, List<Integer> shards) {
this.query = query;
this.sortField = sortField;
this.sortOrder = sortOrder;
this.shards = Collections.unmodifiableList(shards);
}

public String getQuery() {
Expand All @@ -24,4 +32,12 @@ public SortOrder getSortOrder() {
public String getSortField() {
return sortField;
}

public List<Integer> getShards() {
return this.shards;
}

public ElasticSearchQuery withShards(List<Integer> shards) {
return new ElasticSearchQuery(query, sortField, sortOrder, Collections.unmodifiableList(shards));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@
import com.google.common.base.Strings;
import org.elasticsearch.search.sort.SortOrder;

import java.util.Collections;
import java.util.List;

public class ElasticSearchQueryBuilder {
private String query;
private String sortField;
private SortOrder sortOrder = SortOrder.ASC;
private List<Integer> shards = Collections.emptyList();


private ElasticSearchQueryBuilder() {
Expand All @@ -33,8 +37,17 @@ public ElasticSearchQueryBuilder setSortByField(String orderByField) {
return this;
}

public ElasticSearchQueryBuilder setShards(List<Integer> shards) {
if (shards == null) {
this.shards = Collections.emptyList();
} else {
this.shards = shards;
}
return this;
}

public ElasticSearchQuery build() {
return new ElasticSearchQuery(query, sortField, sortOrder);
return new ElasticSearchQuery(query, sortField, sortOrder, shards);
}

public static ElasticSearchQueryBuilder builder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ public Optional<BulkResult> indexData(ElasticDataPointer targetDataPointer, Sear
for (SearchHit hit : hits) {
Map<String, Object> source = hit.getSource();
IndexRequestBuilder requestBuilder = prepareIndex(targetDataPointer.getIndexName(), targetDataPointer
.getTypeName(), hit.getId());
.getTypeName(), hit.getId());
if (hit.getFields().get("_ttl") != null) {
requestBuilder.setTTL(hit.getFields().get("_ttl").value());
requestBuilder.setTTL(hit.getFields().get("_ttl").<Long>value());
}
if (hit.getFields().get("_routing") != null) {
requestBuilder.setRouting(hit.getFields().get("_routing").value());
requestBuilder.setRouting(hit.getFields().get("_routing").<String>value());
}
requestBuilder.setSource(source);
bulkRequest.add(requestBuilder);
Expand All @@ -49,9 +49,9 @@ private Optional<BulkResult> executeBulk(int indexedCount, BulkRequestBuilder bu
if (bulkRequest.numberOfActions() > 0) {
BulkResponse bulkItemResponses = bulkRequest.execute().actionGet();
Set<String> failedIds = Stream.of(bulkItemResponses.getItems())
.filter(BulkItemResponse::isFailed)
.map(BulkItemResponse::getId)
.collect(Collectors.toSet());
.filter(BulkItemResponse::isFailed)
.map(BulkItemResponse::getId)
.collect(Collectors.toSet());
return Optional.of(new BulkResult(indexedCount, failedIds));
}
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
import pl.allegro.tech.search.elasticsearch.tools.reindex.query.BoundedSegment;
import pl.allegro.tech.search.elasticsearch.tools.reindex.query.filter.BoundedFilterFactory;

import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

public class QueryComponent {

Expand All @@ -37,29 +39,33 @@ public class QueryComponent {

public SearchResponse prepareSearchScrollRequest() {
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(dataPointer.getIndexName())
.setTypes(dataPointer.getTypeName())
.setSearchType(SearchType.SCAN)
.addFields("_ttl", "_source")
.setScroll(new TimeValue(SCROLL_TIME_LIMIT))
.setSize(SCROLL_SHARD_LIMIT);
.setTypes(dataPointer.getTypeName())
.setSearchType(SearchType.SCAN)
.addFields("_ttl", "_source")
.setScroll(new TimeValue(SCROLL_TIME_LIMIT))
.setSize(SCROLL_SHARD_LIMIT);

if (!Strings.isNullOrEmpty(query.getQuery())) {
searchRequestBuilder.setQuery(query.getQuery());
}
if (!Strings.isNullOrEmpty(query.getSortField())) {
searchRequestBuilder.addSort(new FieldSortBuilder(query.getSortField()).order(query.getSortOrder()));
}
if (query.getShards() != null && !query.getShards().isEmpty()) {
String joinedShards = String.join(",", query.getShards().stream().map(shard -> shard.toString()).collect(Collectors.toList()));
searchRequestBuilder.setPreference("_shards:" + joinedShards);
}

bound.map(resolvedBound -> boundedFilterFactory.createBoundedFilter(segmentationField.get(), resolvedBound))
.ifPresent(searchRequestBuilder::setQuery);
.ifPresent(searchRequestBuilder::setQuery);

return searchRequestBuilder.execute().actionGet();
}

public SearchResponse getNextScrolledSearchResults(String scrollId) {
return client.prepareSearchScroll(scrollId)
.setScroll(new TimeValue(SCROLL_TIMEOUT))
.get();
.setScroll(new TimeValue(SCROLL_TIMEOUT))
.get();
}

int getResponseSize(SearchResponse response) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import pl.allegro.tech.search.elasticsearch.tools.reindex.query.BoundedSegment;
import org.elasticsearch.client.Client;

import java.util.Collections;
import java.util.List;
import java.util.Optional;

public final class QueryComponentBuilder {
Expand Down Expand Up @@ -42,7 +44,6 @@ public QueryComponentBuilder setQuery(ElasticSearchQuery query) {
return this;
}


public static QueryComponentBuilder builder() {
return new QueryComponentBuilder();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ElasticSearchQuery;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
Expand All @@ -12,8 +13,8 @@ public final class DoubleFieldSegmentation extends SegmentationQueryTrait implem

private final Optional<String> fieldName;

private DoubleFieldSegmentation(String fieldName, List<Double> thresholds, ElasticSearchQuery query) {
super(query);
private DoubleFieldSegmentation(String fieldName, List<Double> thresholds, ElasticSearchQuery query, Boolean segmentationByShards) {
super(query, segmentationByShards);
this.fieldName = Optional.of(fieldName);
this.thresholds = Collections.unmodifiableList(thresholds);
}
Expand All @@ -25,20 +26,39 @@ public Optional<String> getFieldName() {

@Override
public int getSegmentsCount() {
if (this.getSegmentationByShards() && this.getQuery().getShards().size() > 1) {
return (thresholds.size() - 1) * this.getQuery().getShards().size();
}

return thresholds.size() - 1;
}

@Override
public Optional<BoundedSegment> getThreshold(int i) {
int thresholdIndex = i;
if (getSegmentationByShards() && this.getQuery().getShards().size() > 1) {
thresholdIndex = i % (thresholds.size() - 1);
}

RangeSegment segmentation =
RangeSegmentBuilder.builder()
.setLowerOpenBound(thresholds.get(i))
.setUpperBound(thresholds.get(i + 1))
.createRangeSegment();
RangeSegmentBuilder.builder()
.setLowerOpenBound(thresholds.get(thresholdIndex))
.setUpperBound(thresholds.get(thresholdIndex + 1))
.createRangeSegment();

return Optional.of(segmentation);
}

public static DoubleFieldSegmentation create(String fieldName, List<Double> thresholds, ElasticSearchQuery query) {
return new DoubleFieldSegmentation(fieldName, thresholds, query);
@Override
public ElasticSearchQuery getQuery(int i) {
if (getSegmentationByShards() && this.getQuery().getShards().size() > 1) {
return getQuery().withShards(Arrays.asList(getQuery().getShards().get(i / (thresholds.size() - 1))));
}

return getQuery();
}

public static DoubleFieldSegmentation create(String fieldName, List<Double> thresholds, ElasticSearchQuery query, Boolean segmentationByShards) {
return new DoubleFieldSegmentation(fieldName, thresholds, query, segmentationByShards);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
public final class EmptySegmentation extends SegmentationQueryTrait implements QuerySegmentation {

private EmptySegmentation(ElasticSearchQuery query) {
super(query);
super(query, false);
}

@Override
Expand All @@ -25,6 +25,11 @@ public Optional<BoundedSegment> getThreshold(int i) {
return Optional.empty();
}

@Override
public ElasticSearchQuery getQuery(int i) {
return this.getQuery();
}

public static EmptySegmentation createEmptySegmentation(ElasticSearchQuery query) {
return new EmptySegmentation(query);
}
Expand Down
Loading