Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,8 @@ public static final IndexShard newIndexShard(
cbs,
(indexSettings, shardRouting) -> new InternalTranslogFactory(),
SegmentReplicationCheckpointPublisher.EMPTY,
null
null,
RemoteStoreSegmentUploadNotificationPublisher.EMPTY
);
}

Expand Down
20 changes: 20 additions & 0 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -439,9 +439,18 @@
import org.opensearch.rest.action.search.RestMultiSearchAction;
import org.opensearch.rest.action.search.RestSearchAction;
import org.opensearch.rest.action.search.RestSearchScrollAction;
import org.opensearch.xreplication.actions.index.StartCCRIndexTaskAction;
import org.opensearch.xreplication.actions.index.TransportStartCCRIndexTaskAction;
import org.opensearch.xreplication.actions.start.RestStartCCRAction;
import org.opensearch.xreplication.actions.start.StartCCRAction;
import org.opensearch.xreplication.actions.start.TransportStartCCRAction;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.usage.UsageService;
import org.opensearch.xreplication.actions.followers.StartCCRFollowerTaskAction;
import org.opensearch.xreplication.actions.followers.TransportStartCCRFollowerTaskAction;
import org.opensearch.xreplication.actions.syncsegments.SyncFromLeaderAction;
import org.opensearch.xreplication.actions.syncsegments.TransportSyncFromLeaderAction;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -731,6 +740,14 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(DecommissionAction.INSTANCE, TransportDecommissionAction.class);
actions.register(GetDecommissionStateAction.INSTANCE, TransportGetDecommissionStateAction.class);
actions.register(DeleteDecommissionStateAction.INSTANCE, TransportDeleteDecommissionStateAction.class);


// Tansport actions for Full Cluster Replication
actions.register(StartCCRAction.INSTANCE, TransportStartCCRAction.class);
actions.register(StartCCRFollowerTaskAction.INSTANCE, TransportStartCCRFollowerTaskAction.class);
actions.register(StartCCRIndexTaskAction.INSTANCE, TransportStartCCRIndexTaskAction.class);
actions.register(SyncFromLeaderAction.INSTANCE, TransportSyncFromLeaderAction.class);

return unmodifiableMap(actions.getRegistry());
}

Expand Down Expand Up @@ -915,6 +932,9 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestPitSegmentsAction(nodesInCluster));
registerHandler.accept(new RestDeleteDecommissionStateAction());

// xcluster
registerHandler.accept(new RestStartCCRAction());

for (ActionPlugin plugin : actionPlugins) {
for (RestHandler handler : plugin.getRestHandlers(
settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,16 @@ public Iterator<Setting<?>> settings() {
Property.IndexScope,
Property.Final
);
public static final String CCR_REPLICATING_FROM_INDEX = "index.replication.fcr.replicating_from";
public static final Setting<String> CCR_REPLICATING_FROM_INDEX_SETTING = new Setting<>(CCR_REPLICATING_FROM_INDEX, "", Function.identity(), Property.IndexScope);

public static final String CCR_REMOTE_PATH = "index.replication.fcr.remote_path";
public static final Setting<String> CCR_REMOTE_PATH_SETTING = new Setting<>(
CCR_REMOTE_PATH,
"",
Function.identity(),
Property.IndexScope
);

public static final String SETTING_AUTO_EXPAND_REPLICAS = "index.auto_expand_replicas";
public static final Setting<AutoExpandReplicas> INDEX_AUTO_EXPAND_REPLICAS_SETTING = AutoExpandReplicas.SETTING;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED,
IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME,
IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY,
IndexMetadata.CCR_REPLICATING_FROM_INDEX_SETTING,

// validate that built-in similarities don't get redefined
Setting.groupSetting("index.similarity.", (s) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ public class FeatureFlags {
*/
public static final String REPLICATION_TYPE = "opensearch.experimental.feature.replication_type.enabled";

/**
* Gates the functionality of full cluster replication.
* Once the feature is ready for production release, this feature flag can be removed.
*/
public static final String X_REPLICATION = "opensearch.experimental.feature.full_cluster_replication.enabled";

/**
* Gates the visibility of the index setting that allows persisting data to remote store along with local disk.
* Once the feature is ready for production release, this feature flag can be removed.
Expand Down Expand Up @@ -75,6 +81,7 @@ public static void initializeFeatureFlags(Settings openSearchSettings) {
* and false otherwise.
*/
public static boolean isEnabled(String featureFlagName) {
if (REPLICATION_TYPE.equals(featureFlagName) || REMOTE_STORE.equals(featureFlagName) || X_REPLICATION.equals(featureFlagName)) return true;
if ("true".equalsIgnoreCase(System.getProperty(featureFlagName))) {
// TODO: Remove the if condition once FeatureFlags are only supported via opensearch.yml
return true;
Expand Down
7 changes: 5 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardClosedException;
import org.opensearch.index.shard.IndexingOperationListener;
import org.opensearch.index.shard.RemoteStoreSegmentUploadNotificationPublisher;
import org.opensearch.index.shard.SearchOperationListener;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.shard.ShardNotFoundException;
Expand Down Expand Up @@ -438,7 +439,8 @@ public synchronized IndexShard createShard(
final ShardRouting routing,
final Consumer<ShardId> globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer,
final SegmentReplicationCheckpointPublisher checkpointPublisher
final SegmentReplicationCheckpointPublisher checkpointPublisher,
final RemoteStoreSegmentUploadNotificationPublisher remoteSegmentNotificationPublisher
) throws IOException {
Objects.requireNonNull(retentionLeaseSyncer);
/*
Expand Down Expand Up @@ -506,7 +508,8 @@ public synchronized IndexShard createShard(
circuitBreakerService,
translogFactorySupplier,
this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null,
remoteStore
remoteStore,
remoteSegmentNotificationPublisher
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down
16 changes: 16 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.function.Function;
import java.util.function.UnaryOperator;

import static org.opensearch.cluster.metadata.IndexMetadata.CCR_REMOTE_PATH_SETTING;
import static org.opensearch.common.util.FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY;
import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_DEPTH_LIMIT_SETTING;
import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING;
Expand Down Expand Up @@ -587,6 +588,8 @@ public final class IndexSettings {
private final int numberOfShards;
private final ReplicationType replicationType;
private final boolean isRemoteStoreEnabled;
private final boolean isCCRReplicatingIndex;
private final String ccrReplicatingFrom;
private final boolean isRemoteTranslogStoreEnabled;
private final TimeValue remoteTranslogUploadBufferInterval;
private final String remoteStoreTranslogRepository;
Expand Down Expand Up @@ -761,6 +764,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
isRemoteStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false);
isRemoteTranslogStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, false);
remoteStoreTranslogRepository = settings.get(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY);
ccrReplicatingFrom = settings.get(IndexMetadata.CCR_REPLICATING_FROM_INDEX, "");
isCCRReplicatingIndex = !Strings.isNullOrEmpty(ccrReplicatingFrom);
remoteTranslogUploadBufferInterval = settings.getAsTime(
IndexMetadata.SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL,
TimeValue.timeValueMillis(100)
Expand Down Expand Up @@ -1024,6 +1029,13 @@ public boolean isRemoteStoreEnabled() {
return isRemoteStoreEnabled;
}

public boolean isCCRReplicatingIndex() {
return isCCRReplicatingIndex;
}

public String getCCRReplicatingFrom() {
return ccrReplicatingFrom;
}
/**
* Returns remote store repository configured for this index.
*/
Expand Down Expand Up @@ -1565,4 +1577,8 @@ private void setMergeOnFlushPolicy(String policy) {
public Optional<UnaryOperator<MergePolicy>> getMergeOnFlushPolicy() {
return Optional.ofNullable(mergeOnFlushPolicy);
}

public String getCCRRemotePath() {
return CCR_REMOTE_PATH_SETTING.get(settings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,11 @@ public boolean isReadOnlyReplica() {
return indexSettings.isSegRepEnabled() && isReadOnlyReplica;
}


public boolean isReadOnlyPrimary() {
return indexSettings.isSegRepEnabled() && indexSettings.isRemoteStoreEnabled() && indexSettings.isCCRReplicatingIndex() && !isReadOnlyReplica;
}

/**
* Returns the underlying primaryModeSupplier.
* @return the primary mode supplier.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ public InternalTranslogManager translogManager() {
throttle = new IndexThrottle();
try {
store.trimUnsafeCommits(engineConfig.getTranslogConfig().getTranslogPath());
//TODO: fix this. Reading translogUUID from segments won't work if the segments are from a different cluster(CCR).
final Map<String, String> userData = store.readLastCommittedSegmentsInfo().getUserData();
final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY));
TranslogEventListener internalTranslogEventListener = new TranslogEventListener() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ public int fillSeqNoGaps(long primaryTerm) throws IOException {

@Override
public Engine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException {
throw new UnsupportedOperationException("Read only replicas do not have an IndexWriter and cannot recover from a translog.");
return this;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
public class NRTReplicationEngineFactory implements EngineFactory {
@Override
public Engine newReadWriteEngine(EngineConfig config) {
if (config.isReadOnlyReplica()) {
// Load NRTReplicationEngine for primaries on CCR Follower as well.
if (config.isReadOnlyReplica() || config.isReadOnlyPrimary()) {
return new NRTReplicationEngine(config);
}
return new InternalEngine(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public void beforeRefresh() throws IOException {

@Override
public void afterRefresh(boolean didRefresh) throws IOException {
if (didRefresh && shard.state() == IndexShardState.STARTED && shard.getReplicationTracker().isPrimaryMode()) {
if (didRefresh && shard.state() == IndexShardState.STARTED && shard.getReplicationTracker().isPrimaryMode()
&& !shard.indexSettings.isRemoteStoreEnabled()) {
publisher.publish(shard, shard.getLatestReplicationCheckpoint());
}
}
Expand Down
46 changes: 27 additions & 19 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@
import org.opensearch.index.store.StoreStats;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogCorruptedException;
import org.opensearch.index.translog.TranslogFactory;
import org.opensearch.index.translog.TranslogStats;
import org.opensearch.index.warmer.ShardIndexWarmerService;
Expand Down Expand Up @@ -327,6 +328,7 @@ Runnable getGlobalCheckpointSyncer() {

private final Store remoteStore;
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;
private final RemoteStoreSegmentUploadNotificationPublisher remoteSegmentNotificationPublisher;

public IndexShard(
final ShardRouting shardRouting,
Expand All @@ -351,7 +353,8 @@ public IndexShard(
final CircuitBreakerService circuitBreakerService,
final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher,
@Nullable final Store remoteStore
@Nullable final Store remoteStore,
RemoteStoreSegmentUploadNotificationPublisher remoteSegmentNotificationPublisher
) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
Expand Down Expand Up @@ -403,6 +406,7 @@ public IndexShard(
this.pendingPrimaryTerm = primaryTerm;
this.globalCheckpointListeners = new GlobalCheckpointListeners(shardId, threadPool.scheduler(), logger);
this.pendingReplicationActions = new PendingReplicationActions(shardId, threadPool);
this.remoteSegmentNotificationPublisher = remoteSegmentNotificationPublisher;
this.replicationTracker = new ReplicationTracker(
shardId,
aId,
Expand Down Expand Up @@ -2182,7 +2186,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
assert currentEngineReference.get() == null : "engine is running";
verifyNotClosed();
if (indexSettings.isRemoteStoreEnabled()) {
syncSegmentsFromRemoteSegmentStore(false);
syncSegmentsFromRemoteSegmentStore(false, true, true);
}
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
final Engine newEngine = engineFactory.newReadWriteEngine(config);
Expand Down Expand Up @@ -3090,6 +3094,7 @@ public void updateGlobalCheckpointOnReplica(final long globalCheckpoint, final S
* When remote translog is enabled for an index, replication operation is limited to primary term validation and does not
* update local checkpoint at replica, so the local checkpoint at replica can be less than globalCheckpoint.
*/

assert (state() != IndexShardState.POST_RECOVERY && state() != IndexShardState.STARTED)
|| indexSettings.isRemoteTranslogStoreEnabled() : "supposedly in-sync shard copy received a global checkpoint ["
+ globalCheckpoint
Expand Down Expand Up @@ -3498,12 +3503,15 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro

final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();
internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric));
if (isRemoteStoreEnabled()) {
internalRefreshListener.add(new RemoteStoreRefreshListener(this));
// Skip SegRep refresh listener for CCR follower indices.
if (isRemoteStoreEnabled() && !indexSettings.isCCRReplicatingIndex()) {
internalRefreshListener.add(new RemoteStoreRefreshListener(this, remoteSegmentNotificationPublisher));
}
if (this.checkpointPublisher != null && indexSettings.isSegRepEnabled() && shardRouting.primary()) {
// Skip SegRep refresh listener for CCR leader as well as follower indices.
if (this.checkpointPublisher != null && indexSettings.isSegRepEnabled() && shardRouting.primary() && !indexSettings.isRemoteStoreEnabled() && !indexSettings.isCCRReplicatingIndex()) {
internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher));
}

/**
* With segment replication enabled for primary relocation, recover replica shard initially as read only and
* change to a writeable engine during relocation handoff after a round of segment replication.
Expand Down Expand Up @@ -4078,7 +4086,7 @@ EngineConfigFactory getEngineConfigFactory() {
}

// for tests
ReplicationTracker getReplicationTracker() {
public ReplicationTracker getReplicationTracker() {
return replicationTracker;
}

Expand Down Expand Up @@ -4347,7 +4355,7 @@ public void close() throws IOException {
};
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
if (indexSettings.isRemoteStoreEnabled()) {
syncSegmentsFromRemoteSegmentStore(false);
syncSegmentsFromRemoteSegmentStore(false, true, true);
}
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)));
onNewEngine(newEngineReference.get());
Expand Down Expand Up @@ -4380,23 +4388,14 @@ public void close() throws IOException {
onSettingsChanged();
}

/**
* Downloads segments from remote segment store. This method will download segments till
* last refresh checkpoint.
* @param overrideLocal flag to override local segment files with those in remote store
* @throws IOException if exception occurs while reading segments from remote store
*/
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOException {
syncSegmentsFromRemoteSegmentStore(overrideLocal, true);
}

/**
* Downloads segments from remote segment store.
* @param overrideLocal flag to override local segment files with those in remote store
* @param refreshLevelSegmentSync last refresh checkpoint is used if true, commit checkpoint otherwise
* @param shouldCommit if the shard requires committing the changes after sync from remote.
* @throws IOException if exception occurs while reading segments from remote store
*/
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean refreshLevelSegmentSync) throws IOException {
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean refreshLevelSegmentSync, boolean shouldCommit) throws IOException {
assert indexSettings.isRemoteStoreEnabled();
logger.info("Downloading segments from remote segment store");
assert remoteStore.directory() instanceof FilterDirectory : "Store.directory is not an instance of FilterDirectory";
Expand Down Expand Up @@ -4448,6 +4447,7 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re
skippedSegments.add(file);
}
}

if (refreshLevelSegmentSync && segmentInfosSnapshotFilename != null) {
try (
ChecksumIndexInput indexInput = new BufferedChecksumIndexInput(
Expand All @@ -4459,8 +4459,16 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re
indexInput,
Long.parseLong(segmentInfosSnapshotFilename.split("__")[1])
);

long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
if (shouldCommit) {
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
}
else {
// We don't need to trigger a commit for segment copy from primaries(like SegRep with remote store, CCR)
finalizeReplication(infosSnapshot);
store.cleanupAndPreserveLatestCommitPoint("finalize - clean with in memory infos", infosSnapshot);
}
}
}
} catch (IOException e) {
Expand Down
Loading