Skip to content

Commit 8d66bf0

Browse files
committed
Merge branch 'disable_cluster_nodes_sniffing' of https://github.com/centic9/elasticsearch-reindex-tool into centic9-disable_cluster_nodes_sniffing
2 parents 248c463 + ca8596b commit 8d66bf0

File tree

9 files changed

+100
-13
lines changed

9 files changed

+100
-13
lines changed

README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,10 @@ Options:
8686
Target f.e. http://localhost:9300/target_index/type
8787
-tc, target-cluster
8888
Target cluster name
89+
-disable-cluster-sniffing
90+
Don't try to determine additional cluster nodes (e.g. when your network
91+
only allows access to one of the nodes)
92+
Default: false
8993
-segmentationField
9094
Segmentation field
9195
-segmentationPrefixes
@@ -102,6 +106,10 @@ Options:
102106
`segmentationField`, `segmentationThreshold` and `segmentationPrefixes` are optional parameters, allowing to spread
103107
querying for field with double values or prefix for string field
104108

109+
`disable-cluster-sniffing` allows to work in cases where the network-setup makes it impossible to connect to all nodes
110+
of the source or target cluster. Note that it may lead to slightly reduced reindexing rates as data can only be sent
111+
via one node then.
112+
105113
During reindex process progress message is prompted after each scroll query.
106114

107115
Example of progress message with the time how long it lasts, number of items queried and indexed, occupancy of queue, number of concurrent reader threads and number of failed document indexing:

src/main/java/pl/allegro/tech/search/elasticsearch/tools/reindex/ReindexCommandParser.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,10 @@
22

33
import com.beust.jcommander.JCommander;
44
import com.beust.jcommander.ParameterException;
5+
56
import pl.allegro.tech.search.elasticsearch.tools.reindex.command.ReindexCommand;
67
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ElasticDataPointer;
78
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ElasticDataPointerBuilder;
8-
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ElasticSearchQuery;
9-
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ElasticSearchQueryBuilder;
109
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ParsingElasticsearchAddressException;
1110
import pl.allegro.tech.search.elasticsearch.tools.reindex.query.QuerySegmentation;
1211
import pl.allegro.tech.search.elasticsearch.tools.reindex.query.QuerySegmentationFactory;
@@ -40,10 +39,12 @@ private void buildReindexParameters(ReindexCommand command) {
4039
sourcePointer = ElasticDataPointerBuilder.builder()
4140
.setClusterName(command.getSourceClusterName())
4241
.setAddress(command.getSource())
42+
.setSniff(!command.isDisableSniff())
4343
.build();
4444
targetPointer = ElasticDataPointerBuilder.builder()
4545
.setClusterName(command.getTargetClusterName())
4646
.setAddress(command.getTarget())
47+
.setSniff(!command.isDisableSniff())
4748
.build();
4849
segmentation = getFieldSegmentation(command);
4950
}

src/main/java/pl/allegro/tech/search/elasticsearch/tools/reindex/command/ReindexCommand.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package pl.allegro.tech.search.elasticsearch.tools.reindex.command;
22

3-
import com.beust.jcommander.Parameter;
4-
53
import java.util.List;
64

5+
import com.beust.jcommander.Parameter;
6+
77
public class ReindexCommand {
88

99
@Parameter(names = { "-s", "source" }, description = "Source f.e. http://localhost:9300/source_index/type",
@@ -37,6 +37,9 @@ public class ReindexCommand {
3737
@Parameter(names = { "-segmentationPrefixes" }, description = "Segmentation prefixes (comma-separated)")
3838
private List<String> segmentationPrefixes;
3939

40+
@Parameter(names = { "-disable-cluster-sniffing" }, description = "Don't try to determine additional cluster nodes (e.g. when your network only allows access to one of the nodes)")
41+
private boolean disableSniff;
42+
4043
public String getSourceClusterName() {
4144
return sourceClusterName;
4245
}
@@ -74,4 +77,8 @@ public String getSource() {
7477
public String getTarget() {
7578
return target;
7679
}
80+
81+
public boolean isDisableSniff() {
82+
return disableSniff;
83+
}
7784
}

src/main/java/pl/allegro/tech/search/elasticsearch/tools/reindex/connection/ElasticDataPointer.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@ public class ElasticDataPointer {
77
private final String indexName;
88
private final String typeName;
99
private final int port;
10+
private final boolean sniff;
1011

11-
ElasticDataPointer(String host, String clusterName, String indexName, String typeName, int port) {
12+
ElasticDataPointer(String host, String clusterName, String indexName, String typeName, int port, boolean sniff) {
1213
this.host = host;
1314
this.clusterName = clusterName;
1415
this.indexName = indexName;
1516
this.typeName = typeName;
1617
this.port = port;
18+
this.sniff = sniff;
1719
}
1820

1921
public String getHost() {
@@ -35,4 +37,8 @@ public String getTypeName() {
3537
public int getPort() {
3638
return port;
3739
}
40+
41+
public boolean isSniff() {
42+
return sniff;
43+
}
3844
}

src/main/java/pl/allegro/tech/search/elasticsearch/tools/reindex/connection/ElasticDataPointerBuilder.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ public class ElasticDataPointerBuilder {
66

77
private String clusterName = "elasticsearch";
88
private ElasticAddress address;
9+
private boolean sniff = true;
910

1011
private ElasticDataPointerBuilder() {
1112
}
@@ -20,9 +21,14 @@ public ElasticDataPointerBuilder setClusterName(String clusterName) {
2021
return this;
2122
}
2223

24+
public ElasticDataPointerBuilder setSniff(boolean sniff) {
25+
this.sniff = sniff;
26+
return this;
27+
}
28+
2329
public ElasticDataPointer build() {
2430
return new ElasticDataPointer(address.getHost(), clusterName, address.getIndexName(), address.getTypeName(),
25-
address.getPort());
31+
address.getPort(), sniff);
2632
}
2733

2834
public static ElasticDataPointerBuilder builder() {

src/main/java/pl/allegro/tech/search/elasticsearch/tools/reindex/connection/ElasticSearchClientFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ private ElasticSearchClientFactory() {
1515

1616
public static Client createClient(ElasticDataPointer elasticDataPointer) {
1717
Settings settings = Settings.settingsBuilder()
18-
.put("client.transport.sniff", true)
18+
.put("client.transport.sniff", elasticDataPointer.isSniff())
1919
.put(ClusterName.SETTING, elasticDataPointer.getClusterName())
2020
.build();
2121
TransportClient client = TransportClient.builder().settings(settings).build();

src/test/java/pl/allegro/tech/search/elasticsearch/tools/reindex/ReindexCommandParserTest.java

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,20 @@
11
package pl.allegro.tech.search.elasticsearch.tools.reindex;
22

3-
import jdk.nashorn.internal.parser.JSONParser;
43
import junitparams.JUnitParamsRunner;
54
import junitparams.Parameters;
6-
import org.elasticsearch.index.mapper.ObjectMappers;
7-
import org.elasticsearch.index.mapper.object.ObjectMapper;
85
import org.elasticsearch.search.sort.SortOrder;
96
import org.junit.Assert;
107
import org.junit.Test;
118
import org.junit.runner.RunWith;
12-
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ElasticDataPointerAssert;
139
import pl.allegro.tech.search.elasticsearch.tools.reindex.query.PrefixSegment;
1410
import pl.allegro.tech.search.elasticsearch.tools.reindex.query.RangeSegment;
1511

1612
import java.util.Optional;
1713

1814
import static pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ElasticDataPointerAssert.assertThat;
15+
import static pl.allegro.tech.search.elasticsearch.tools.reindex.query.PrefixSegmentAssert.assertThat;
1916
import static pl.allegro.tech.search.elasticsearch.tools.reindex.query.QuerySegmentationAssert.assertThat;
2017
import static pl.allegro.tech.search.elasticsearch.tools.reindex.query.RangeSegmentAssert.assertThat;
21-
import static pl.allegro.tech.search.elasticsearch.tools.reindex.query.PrefixSegmentAssert.assertThat;
2218

2319
@RunWith(JUnitParamsRunner.class)
2420
public class ReindexCommandParserTest {
@@ -217,6 +213,39 @@ public void parseCommandWithSortOrderDefaultASC() {
217213
Assert.assertEquals(SortOrder.ASC, commandParser.getSegmentation().getQuery().getSortOrder());
218214
}
219215

216+
@Test
217+
public void parseDisableClusterSniffing() {
218+
//given
219+
ReindexCommandParser commandParser = new ReindexCommandParser();
220+
//when
221+
boolean result = commandParser.tryParse(createArgumentArray(
222+
"-sc", "sourceClusterName",
223+
"-tc", "targetClusterName",
224+
"-s", "http://sourceHost1:9333/source_index/source_type",
225+
"-t", "http://targetHost1:9333/target_index/target_type",
226+
"-disable-cluster-sniffing"
227+
));
228+
//then
229+
Assert.assertFalse(commandParser.getSourcePointer().isSniff());
230+
Assert.assertFalse(commandParser.getTargetPointer().isSniff());
231+
}
232+
233+
@Test
234+
public void parseWhenNotDisabledClusterSniffing() {
235+
//given
236+
ReindexCommandParser commandParser = new ReindexCommandParser();
237+
//when
238+
boolean result = commandParser.tryParse(createArgumentArray(
239+
"-sc", "sourceClusterName",
240+
"-tc", "targetClusterName",
241+
"-s", "http://sourceHost1:9333/source_index/source_type",
242+
"-t", "http://targetHost1:9333/target_index/target_type"
243+
));
244+
//then
245+
Assert.assertTrue(commandParser.getSourcePointer().isSniff());
246+
Assert.assertTrue(commandParser.getTargetPointer().isSniff());
247+
248+
}
220249

221250
private String[] createArgumentArray(String... args) {
222251
return args;

src/test/java/pl/allegro/tech/search/elasticsearch/tools/reindex/connection/ElasticSearchClientProducerTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,4 +45,35 @@ public void validateCreatedLocalElasticClientWithProperClusterName() throws Exce
4545
client.close();
4646
}
4747

48+
49+
@Test
50+
public void validateCreatedLocalElasticClientWithoutSniff() throws Exception {
51+
//given
52+
ElasticDataPointer dataPointer = ElasticDataPointerBuilder.builder()
53+
.setAddress("http://localhost:9300/"+INDEX_NAME+"/type")
54+
.setClusterName(CLUSTER_NAME)
55+
.setSniff(false)
56+
.build();
57+
//when
58+
Client client = ElasticSearchClientFactory.createClient(dataPointer);
59+
//then
60+
Assertions.assertThat(client.settings().get("cluster.name")).isEqualTo(CLUSTER_NAME);
61+
client.close();
62+
}
63+
64+
@Test
65+
public void validateCreatedLocalElasticClientWithSniff() throws Exception {
66+
//given
67+
ElasticDataPointer dataPointer = ElasticDataPointerBuilder.builder()
68+
.setAddress("http://localhost:9300/"+INDEX_NAME+"/type")
69+
.setClusterName(CLUSTER_NAME)
70+
.setSniff(true)
71+
.build();
72+
//when
73+
Client client = ElasticSearchClientFactory.createClient(dataPointer);
74+
//then
75+
Assertions.assertThat(client.settings().get("cluster.name")).isEqualTo(CLUSTER_NAME);
76+
client.close();
77+
}
78+
4879
}

src/test/java/pl/allegro/tech/search/elasticsearch/tools/reindex/embeded/EmbeddedElasticsearchCluster.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import org.elasticsearch.common.xcontent.XContentBuilder;
1212
import org.elasticsearch.node.Node;
1313
import org.elasticsearch.node.NodeBuilder;
14-
import org.elasticsearch.search.sort.SortBuilder;
1514
import pl.allegro.tech.search.elasticsearch.tools.reindex.ReindexInvokerTest;
1615
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ElasticDataPointer;
1716
import pl.allegro.tech.search.elasticsearch.tools.reindex.connection.ElasticDataPointerBuilder;

0 commit comments

Comments
 (0)