Skip to content

Commit a4182e1

Browse files
committed
Full cluster replication POC
1 parent 40eb0bd commit a4182e1

File tree

56 files changed

+2267
-34
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+2267
-34
lines changed

server/src/main/java/org/opensearch/action/ActionModule.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,9 +439,18 @@
439439
import org.opensearch.rest.action.search.RestMultiSearchAction;
440440
import org.opensearch.rest.action.search.RestSearchAction;
441441
import org.opensearch.rest.action.search.RestSearchScrollAction;
442+
import org.opensearch.xreplication.actions.index.StartIndexTaskAction;
443+
import org.opensearch.xreplication.actions.index.TransportStartIndexTaskAction;
444+
import org.opensearch.xreplication.actions.start.RestXReplicateAction;
445+
import org.opensearch.xreplication.actions.start.StartXReplication;
446+
import org.opensearch.xreplication.actions.start.TransportXReplicateAction;
442447
import org.opensearch.tasks.Task;
443448
import org.opensearch.threadpool.ThreadPool;
444449
import org.opensearch.usage.UsageService;
450+
import org.opensearch.xreplication.actions.followers.StartFollowersAction;
451+
import org.opensearch.xreplication.actions.followers.TransportStartFollowerAction;
452+
import org.opensearch.xreplication.actions.syncsegments.SyncFromLeaderAction;
453+
import org.opensearch.xreplication.actions.syncsegments.TransportSyncFromLeaderAction;
445454

446455
import java.util.ArrayList;
447456
import java.util.Collections;
@@ -731,6 +740,14 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
731740
actions.register(DecommissionAction.INSTANCE, TransportDecommissionAction.class);
732741
actions.register(GetDecommissionStateAction.INSTANCE, TransportGetDecommissionStateAction.class);
733742
actions.register(DeleteDecommissionStateAction.INSTANCE, TransportDeleteDecommissionStateAction.class);
743+
744+
745+
// Full Cluster Replication
746+
actions.register(StartXReplication.INSTANCE, TransportXReplicateAction.class);
747+
actions.register(StartFollowersAction.INSTANCE, TransportStartFollowerAction.class);
748+
actions.register(StartIndexTaskAction.INSTANCE, TransportStartIndexTaskAction.class);
749+
actions.register(SyncFromLeaderAction.INSTANCE, TransportSyncFromLeaderAction.class);
750+
734751
return unmodifiableMap(actions.getRegistry());
735752
}
736753

@@ -915,6 +932,9 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
915932
registerHandler.accept(new RestPitSegmentsAction(nodesInCluster));
916933
registerHandler.accept(new RestDeleteDecommissionStateAction());
917934

935+
// xcluster
936+
registerHandler.accept(new RestXReplicateAction());
937+
918938
for (ActionPlugin plugin : actionPlugins) {
919939
for (RestHandler handler : plugin.getRestHandlers(
920940
settings,

server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -495,6 +495,16 @@ public Iterator<Setting<?>> settings() {
495495
Property.IndexScope,
496496
Property.Final
497497
);
498+
public static final String CCR_REPLICATING_FROM_INDEX = "index.replication.fcr.replicating_from";
499+
public static final Setting<String> CCR_REPLICATING_FROM_INDEX_SETTING = new Setting<>(CCR_REPLICATING_FROM_INDEX, "", Function.identity(), Property.IndexScope);
500+
501+
public static final String CCR_REMOTE_PATH = "index.replication.fcr.remote_path";
502+
public static final Setting<String> CCR_REMOTE_PATH_SETTING = new Setting<>(
503+
CCR_REMOTE_PATH,
504+
"",
505+
Function.identity(),
506+
Property.IndexScope
507+
);
498508

499509
public static final String SETTING_AUTO_EXPAND_REPLICAS = "index.auto_expand_replicas";
500510
public static final Setting<AutoExpandReplicas> INDEX_AUTO_EXPAND_REPLICAS_SETTING = AutoExpandReplicas.SETTING;

server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
196196
IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED,
197197
IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME,
198198
IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY,
199+
IndexMetadata.CCR_REPLICATING_FROM_INDEX_SETTING,
199200

200201
// validate that built-in similarities don't get redefined
201202
Setting.groupSetting("index.similarity.", (s) -> {

server/src/main/java/org/opensearch/common/util/FeatureFlags.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,12 @@ public class FeatureFlags {
2727
*/
2828
public static final String REPLICATION_TYPE = "opensearch.experimental.feature.replication_type.enabled";
2929

30+
/**
31+
* Gates the functionality of full cluster replication.
32+
* Once the feature is ready for production release, this feature flag can be removed.
33+
*/
34+
public static final String X_REPLICATION = "opensearch.experimental.feature.full_cluster_replication.enabled";
35+
3036
/**
3137
* Gates the visibility of the index setting that allows persisting data to remote store along with local disk.
3238
* Once the feature is ready for production release, this feature flag can be removed.
@@ -75,6 +81,7 @@ public static void initializeFeatureFlags(Settings openSearchSettings) {
7581
* and false otherwise.
7682
*/
7783
public static boolean isEnabled(String featureFlagName) {
84+
if (REPLICATION_TYPE.equals(featureFlagName) || REMOTE_STORE.equals(featureFlagName) || X_REPLICATION.equals(featureFlagName)) return true;
7885
if ("true".equalsIgnoreCase(System.getProperty(featureFlagName))) {
7986
// TODO: Remove the if condition once FeatureFlags are only supported via opensearch.yml
8087
return true;

server/src/main/java/org/opensearch/index/IndexSettings.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import java.util.function.Function;
6363
import java.util.function.UnaryOperator;
6464

65+
import static org.opensearch.cluster.metadata.IndexMetadata.CCR_REMOTE_PATH_SETTING;
6566
import static org.opensearch.common.util.FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY;
6667
import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_DEPTH_LIMIT_SETTING;
6768
import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING;
@@ -587,6 +588,8 @@ public final class IndexSettings {
587588
private final int numberOfShards;
588589
private final ReplicationType replicationType;
589590
private final boolean isRemoteStoreEnabled;
591+
private final boolean isCCRReplicatingIndex;
592+
private final String ccrReplicatingFrom;
590593
private final boolean isRemoteTranslogStoreEnabled;
591594
private final TimeValue remoteTranslogUploadBufferInterval;
592595
private final String remoteStoreTranslogRepository;
@@ -761,6 +764,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
761764
isRemoteStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false);
762765
isRemoteTranslogStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, false);
763766
remoteStoreTranslogRepository = settings.get(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY);
767+
ccrReplicatingFrom = settings.get(IndexMetadata.CCR_REPLICATING_FROM_INDEX, "");
768+
isCCRReplicatingIndex = !Strings.isNullOrEmpty(ccrReplicatingFrom);
764769
remoteTranslogUploadBufferInterval = settings.getAsTime(
765770
IndexMetadata.SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL,
766771
TimeValue.timeValueMillis(100)
@@ -1024,6 +1029,13 @@ public boolean isRemoteStoreEnabled() {
10241029
return isRemoteStoreEnabled;
10251030
}
10261031

1032+
public boolean isCCRReplicatingIndex() {
1033+
return isCCRReplicatingIndex;
1034+
}
1035+
1036+
public String getCCRReplicatingFrom() {
1037+
return ccrReplicatingFrom;
1038+
}
10271039
/**
10281040
* Returns remote store repository configured for this index.
10291041
*/
@@ -1565,4 +1577,8 @@ private void setMergeOnFlushPolicy(String policy) {
15651577
public Optional<UnaryOperator<MergePolicy>> getMergeOnFlushPolicy() {
15661578
return Optional.ofNullable(mergeOnFlushPolicy);
15671579
}
1580+
1581+
public String getCCRRemotePath() {
1582+
return CCR_REMOTE_PATH_SETTING.get(settings);
1583+
}
15681584
}

server/src/main/java/org/opensearch/index/engine/EngineConfig.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,12 @@ public boolean isReadOnlyReplica() {
408408
return indexSettings.isSegRepEnabled() && isReadOnlyReplica;
409409
}
410410

411+
412+
public boolean isReadOnlyPrimary() {
413+
//TODO: verify on segrep enabled.
414+
return indexSettings.isSegRepEnabled() && indexSettings.isRemoteStoreEnabled() && indexSettings.isCCRReplicatingIndex() && !isReadOnlyReplica;
415+
}
416+
411417
/**
412418
* Returns the underlying primaryModeSupplier.
413419
* @return the primary mode supplier.

server/src/main/java/org/opensearch/index/engine/InternalEngine.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,7 @@ public InternalTranslogManager translogManager() {
256256
throttle = new IndexThrottle();
257257
try {
258258
store.trimUnsafeCommits(engineConfig.getTranslogConfig().getTranslogPath());
259+
//TODO: fix this. Reading translogUUID from segments won't work if the segments are from a different cluster(CCR).
259260
final Map<String, String> userData = store.readLastCommittedSegmentsInfo().getUserData();
260261
final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY));
261262
TranslogEventListener internalTranslogEventListener = new TranslogEventListener() {

server/src/main/java/org/opensearch/index/engine/NRTReplicationEngineFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
public class NRTReplicationEngineFactory implements EngineFactory {
1818
@Override
1919
public Engine newReadWriteEngine(EngineConfig config) {
20-
if (config.isReadOnlyReplica()) {
20+
if (config.isReadOnlyReplica() || config.isReadOnlyPrimary()) {
2121
return new NRTReplicationEngine(config);
2222
}
2323
return new InternalEngine(config);

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@
162162
import org.opensearch.index.store.StoreStats;
163163
import org.opensearch.index.translog.Translog;
164164
import org.opensearch.index.translog.TranslogConfig;
165+
import org.opensearch.index.translog.TranslogCorruptedException;
165166
import org.opensearch.index.translog.TranslogFactory;
166167
import org.opensearch.index.translog.TranslogStats;
167168
import org.opensearch.index.warmer.ShardIndexWarmerService;
@@ -2185,7 +2186,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
21852186
assert currentEngineReference.get() == null : "engine is running";
21862187
verifyNotClosed();
21872188
if (indexSettings.isRemoteStoreEnabled()) {
2188-
syncSegmentsFromRemoteSegmentStore(false, true, false);
2189+
syncSegmentsFromRemoteSegmentStore(false, true, true);
21892190
}
21902191
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
21912192
final Engine newEngine = engineFactory.newReadWriteEngine(config);
@@ -3502,10 +3503,10 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
35023503

35033504
final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();
35043505
internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric));
3505-
if (isRemoteStoreEnabled()) {
3506+
if (isRemoteStoreEnabled() && !indexSettings.isCCRReplicatingIndex()) {
35063507
internalRefreshListener.add(new RemoteStoreRefreshListener(this, remoteSegmentNotificationPublisher));
35073508
}
3508-
if (this.checkpointPublisher != null && indexSettings.isSegRepEnabled() && shardRouting.primary() && !indexSettings.isRemoteStoreEnabled()) {
3509+
if (this.checkpointPublisher != null && indexSettings.isSegRepEnabled() && shardRouting.primary() && !indexSettings.isRemoteStoreEnabled() && !indexSettings.isCCRReplicatingIndex()) {
35093510
internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher));
35103511
}
35113512

@@ -4352,7 +4353,7 @@ public void close() throws IOException {
43524353
};
43534354
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
43544355
if (indexSettings.isRemoteStoreEnabled()) {
4355-
syncSegmentsFromRemoteSegmentStore(false, true, false);
4356+
syncSegmentsFromRemoteSegmentStore(false, true, true);
43564357
}
43574358
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)));
43584359
onNewEngine(newEngineReference.get());
@@ -4456,13 +4457,14 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re
44564457
indexInput,
44574458
Long.parseLong(segmentInfosSnapshotFilename.split("__")[1])
44584459
);
4460+
44594461
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
44604462
if (shouldCommit) {
4461-
finalizeReplication(infosSnapshot);
4462-
store.cleanupAndPreserveLatestCommitPoint("finalize - clean with in memory infos", infosSnapshot);
4463+
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
44634464
}
44644465
else {
4465-
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
4466+
finalizeReplication(infosSnapshot);
4467+
store.cleanupAndPreserveLatestCommitPoint("finalize - clean with in memory infos", infosSnapshot);
44664468
}
44674469
}
44684470
}

server/src/main/java/org/opensearch/index/shard/RemoteStoreSegmentUploadNotificationPublisher.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import org.opensearch.common.inject.Inject;
1212
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
1313
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
14-
14+
import org.opensearch.xreplication.actions.notifysecondary.NotifySecondariesAction;
1515

1616
/**
1717
* Hook to publish notification after primary uploads segments to the remote store.
@@ -20,18 +20,21 @@
2020
*/
2121
public class RemoteStoreSegmentUploadNotificationPublisher {
2222
private final SegmentReplicationCheckpointPublisher segRepPublisher;
23+
private final NotifySecondariesAction xReplicatePublisher;
2324

2425
@Inject
25-
public RemoteStoreSegmentUploadNotificationPublisher(SegmentReplicationCheckpointPublisher segRepPublisher) {
26+
public RemoteStoreSegmentUploadNotificationPublisher(SegmentReplicationCheckpointPublisher segRepPublisher, NotifySecondariesAction xReplicatePublisher) {
2627
this.segRepPublisher = segRepPublisher;
28+
this.xReplicatePublisher = xReplicatePublisher;
2729
}
2830

2931
public void notifySegmentUpload(IndexShard indexShard, ReplicationCheckpoint checkpoint) {
30-
// TODO: Add separate publisher for CCR.
3132
// we don't call indexShard.getLatestReplicationCheckpoint() as it might have a newer refreshed checkpoint.
3233
// Instead we send the one which has been uploaded to remote store.
34+
// TODO: Parallise both the notifications.
3335
if (segRepPublisher != null) segRepPublisher.publish(indexShard, checkpoint);
36+
if (xReplicatePublisher != null) xReplicatePublisher.publish(indexShard, checkpoint);
3437
}
3538

36-
public static final RemoteStoreSegmentUploadNotificationPublisher EMPTY = new RemoteStoreSegmentUploadNotificationPublisher(null);
39+
public static final RemoteStoreSegmentUploadNotificationPublisher EMPTY = new RemoteStoreSegmentUploadNotificationPublisher(null, null);
3740
}

0 commit comments

Comments
 (0)