Skip to content

Commit a8eff82

Browse files
committed
Apply (even more of) Scott's requested changes
1 parent e5cbef1 commit a8eff82

File tree

11 files changed

+170
-239
lines changed

11 files changed

+170
-239
lines changed

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingBase.java

Lines changed: 13 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,6 @@ public abstract class IndexingBase {
100100
private StoreTimerSnapshot lastProgressSnapshot = null;
101101
private boolean forceStampOverwrite = false;
102102
private final long startingTimeMillis;
103-
private long lastTypeStampCheckMillis;
104103
private Map<String, IndexingMerger> indexingMergerMap = null;
105104
@Nullable
106105
private IndexingHeartbeat heartbeat = null; // this will stay null for index scrubbing
@@ -119,7 +118,6 @@ public abstract class IndexingBase {
119118
this.isScrubber = isScrubber;
120119
this.throttle = new IndexingThrottle(common, isScrubber);
121120
this.startingTimeMillis = System.currentTimeMillis();
122-
this.lastTypeStampCheckMillis = startingTimeMillis;
123121
}
124122

125123
// helper functions
@@ -176,8 +174,9 @@ public CompletableFuture<Void> buildIndexAsync(boolean markReadable) {
176174
message.addKeyAndValue(LogMessageKeys.RESULT, "success");
177175
LOGGER.info(message.toString());
178176
}
179-
// Here: if the heartbeat was *not* cleared while marking the index readable, it would be cleared in
180-
// these dedicated transaction. Heartbeat clearing is not a blocker but a "best effort" operation.
177+
// Here: if the heartbeats were not fully cleared while marking the index as readable, they will be cleared in
178+
// this dedicated transaction. Clearing the heartbeats at the end of the indexing session is a "best effort"
179+
// operation, hence exceptions are ignored.
181180
return clearHeartbeats()
182181
.handle((ignoreRet, ignoreEx) -> null);
183182
},
@@ -825,18 +824,17 @@ private CompletableFuture<Boolean> hadTransactionReachedLimits(FDBRecordStore st
825824
}
826825

827826
private CompletableFuture<Void> validateTypeStamp(@Nonnull FDBRecordStore store) {
828-
if (shouldValidate()) {
829-
// check other heartbeats (if exclusive) & typestamp
830-
final IndexBuildProto.IndexBuildIndexingStamp expectedTypeStamp = getIndexingTypeStamp(store);
831-
return forEachTargetIndex(index -> CompletableFuture.allOf(
832-
updateHeartbeat(true, store, index),
833-
store.loadIndexingTypeStampAsync(index)
834-
.thenAccept(typeStamp -> validateTypeStamp(typeStamp, expectedTypeStamp, index))
835-
));
836-
} else {
837-
// update only
838-
return forEachTargetIndex(index -> updateHeartbeat(false, store, index));
827+
// check other heartbeats (if exclusive) & typestamp
828+
if (isScrubber) {
829+
// Scrubber's type-stamp is never commited. It is protected by expecting a READABLE index state.
830+
return AsyncUtil.DONE;
839831
}
832+
final IndexBuildProto.IndexBuildIndexingStamp expectedTypeStamp = getIndexingTypeStamp(store);
833+
return forEachTargetIndex(index -> CompletableFuture.allOf(
834+
updateHeartbeat(true, store, index),
835+
store.loadIndexingTypeStampAsync(index)
836+
.thenAccept(typeStamp -> validateTypeStamp(typeStamp, expectedTypeStamp, index))
837+
));
840838
}
841839

842840
private CompletableFuture<Void> updateHeartbeat(boolean validate, FDBRecordStore store, Index index) {
@@ -877,22 +875,6 @@ private void clearHeartbeatForIndex(FDBRecordStore store, Index index) {
877875
}
878876
}
879877

880-
881-
private boolean shouldValidate() {
882-
final long minimalInterval = policy.getCheckIndexingMethodFrequencyMilliseconds();
883-
if (minimalInterval < 0 || isScrubber) {
884-
return false;
885-
}
886-
if (minimalInterval > 0) {
887-
final long now = System.currentTimeMillis();
888-
if (now < lastTypeStampCheckMillis + minimalInterval) {
889-
return false;
890-
}
891-
lastTypeStampCheckMillis = now;
892-
}
893-
return true;
894-
}
895-
896878
private void validateTypeStamp(final IndexBuildProto.IndexBuildIndexingStamp typeStamp,
897879
final IndexBuildProto.IndexBuildIndexingStamp expectedTypeStamp,
898880
Index index) {
@@ -1028,7 +1010,6 @@ public CompletableFuture<Void> rebuildIndexAsync(@Nonnull FDBRecordStore store)
10281010
}))
10291011
.thenCompose(vignore -> setIndexingTypeOrThrow(store, false))
10301012
.thenCompose(vignore -> rebuildIndexInternalAsync(store))
1031-
// If any of the indexes' heartbeats, for any reason, was not cleared during "mark readable", clear it here
10321013
.whenComplete((ignore, ignoreEx) -> clearHeartbeats(store));
10331014
}
10341015

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingHeartbeat.java

Lines changed: 26 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -93,33 +93,34 @@ public CompletableFuture<Void> checkAndUpdateHeartbeat(@Nonnull FDBRecordStore s
9393
return false;
9494
}
9595
final KeyValue kv = iterator.next();
96-
try {
97-
final UUID otherIndexerId = heartbeatKeyToIndexerId(store, index, kv.getKey());
98-
if (!otherIndexerId.equals(this.indexerId)) {
99-
final IndexBuildProto.IndexBuildHeartbeat otherHeartbeat = IndexBuildProto.IndexBuildHeartbeat.parseFrom(kv.getValue());
100-
final long age = now - otherHeartbeat.getHeartbeatTimeMilliseconds();
101-
if (age > 0 && age < leaseLength) {
102-
// For practical reasons, this exception is backward compatible to the Synchronized Lock one
103-
throw new SynchronizedSessionLockedException("Failed to initialize the session because of an existing session in progress")
104-
.addLogInfo(LogMessageKeys.INDEXER_ID, indexerId)
105-
.addLogInfo(LogMessageKeys.EXISTING_INDEXER_ID, otherIndexerId)
106-
.addLogInfo(LogMessageKeys.AGE_MILLISECONDS, age)
107-
.addLogInfo(LogMessageKeys.TIME_LIMIT_MILLIS, leaseLength);
108-
}
109-
}
110-
} catch (InvalidProtocolBufferException e) {
111-
if (logger.isWarnEnabled()) {
112-
logger.warn(KeyValueLogMessage.of("Bad indexing heartbeat item",
113-
LogMessageKeys.KEY, kv.getKey(),
114-
LogMessageKeys.VALUE, kv.getValue()));
115-
}
116-
}
96+
checkSingleHeartbeat(store, index, kv, now);
11797
return true;
11898
}))
119-
.thenApply(ignore -> {
120-
updateHeartbeat(store, index);
121-
return null;
122-
});
99+
.thenAccept(ignore -> updateHeartbeat(store, index));
100+
}
101+
102+
private void checkSingleHeartbeat(final @Nonnull FDBRecordStore store, final @Nonnull Index index, final KeyValue kv, final long now) {
103+
try {
104+
final UUID otherIndexerId = heartbeatKeyToIndexerId(store, index, kv.getKey());
105+
if (!otherIndexerId.equals(this.indexerId)) {
106+
final IndexBuildProto.IndexBuildHeartbeat otherHeartbeat = IndexBuildProto.IndexBuildHeartbeat.parseFrom(kv.getValue());
107+
final long age = now - otherHeartbeat.getHeartbeatTimeMilliseconds();
108+
if (age > 0 && age < leaseLength) {
109+
// For practical reasons, this exception is backward compatible to the Synchronized Lock one
110+
throw new SynchronizedSessionLockedException("Failed to initialize the session because of an existing session in progress")
111+
.addLogInfo(LogMessageKeys.INDEXER_ID, indexerId)
112+
.addLogInfo(LogMessageKeys.EXISTING_INDEXER_ID, otherIndexerId)
113+
.addLogInfo(LogMessageKeys.AGE_MILLISECONDS, age)
114+
.addLogInfo(LogMessageKeys.TIME_LIMIT_MILLIS, leaseLength);
115+
}
116+
}
117+
} catch (InvalidProtocolBufferException e) {
118+
if (logger.isWarnEnabled()) {
119+
logger.warn(KeyValueLogMessage.of("Bad indexing heartbeat item",
120+
LogMessageKeys.KEY, kv.getKey(),
121+
LogMessageKeys.VALUE, kv.getValue()));
122+
}
123+
}
123124
}
124125

125126
public void clearHeartbeat(@Nonnull FDBRecordStore store, @Nonnull Index index) {

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexer.java

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -315,11 +315,11 @@ private IndexingBase getIndexer() {
315315
}
316316

317317
/**
318+
* <em>Deprecated</em> and unused.
318319
* This {@link Exception} can be thrown in the case that one calls one of the methods
319320
* that explicitly state that they are building an unbuilt range, i.e., a range of keys
320321
* that contains no keys which have yet been processed by the {@link OnlineIndexer}
321322
* during an index build.
322-
* <em>Deprecated</em> and unused.
323323
*/
324324
@API(API.Status.DEPRECATED)
325325
@SuppressWarnings("serial")
@@ -480,7 +480,7 @@ public boolean checkAnyOngoingOnlineIndexBuilds() {
480480
}
481481

482482
/**
483-
* Check if the main index is being built. Note that with shared heartbeats, this function will now return true for any active session - mutual or exclusive.
483+
* Check if the provided index is being built. Note that with shared heartbeats, this function will now return true for any active session - mutual or exclusive.
484484
* @return a future that will complete to <code>true</code> if the index is being built and <code>false</code> otherwise
485485
*/
486486
public CompletableFuture<Boolean> checkAnyOngoingOnlineIndexBuildsAsync() {
@@ -490,7 +490,7 @@ public CompletableFuture<Boolean> checkAnyOngoingOnlineIndexBuildsAsync() {
490490
}
491491

492492
/**
493-
* Check if the main index is being built. Note that with shared heartbeats, this function will now return true for any active session - mutual or exclusive.
493+
* Check if the provided index is being built. Note that with shared heartbeats, this function will now return true for any active session - mutual or exclusive.
494494
* Where "active session" is determined by an indexing heartbeat that is less than {@link OnlineIndexOperationConfig#DEFAULT_LEASE_LENGTH_MILLIS} old.
495495
* @param recordStore record store whose index builds need to be checked
496496
* @param index the index to check for ongoing index builds
@@ -501,7 +501,7 @@ public static CompletableFuture<Boolean> checkAnyOngoingOnlineIndexBuildsAsync(@
501501
}
502502

503503
/**
504-
* Check if the main index is being built. Note that with shared heartbeats, this function will now return true for any active session - mutual or exclusive.
504+
* Check if the provided index is being built. Note that with shared heartbeats, this function will now return true for any active session - mutual or exclusive.
505505
* @param recordStore record store whose index builds need to be checked
506506
* @param index the index to check for ongoing index builds
507507
* @param leasingMilliseconds max heartbeat age to be considered an "active session"
@@ -517,7 +517,14 @@ public static CompletableFuture<Boolean> checkAnyOngoingOnlineIndexBuildsAsync(@
517517

518518
/**
519519
* Builds an index across multiple transactions.
520-
* This is a slow and retrying operation that is intended to be executed by background processes.
520+
* <p>
521+
* If the indexing session is not mutual, it will stop with {@link com.apple.foundationdb.synchronizedsession.SynchronizedSessionLockedException}
522+
* if there is another active indexing session on the same index. It first checks and updates index states and
523+
* clear index data respecting the {@link IndexStatePrecondition} being set. It then builds the index across
524+
* multiple transactions honoring the rate-limiting parameters set in the constructor of this class. It also retries
525+
* any retriable errors that it encounters while it runs the build. At the end, it marks the index readable in the
526+
* store.
527+
* </p>
521528
* @return a future that will be ready when the build has completed
522529
* @throws com.apple.foundationdb.synchronizedsession.SynchronizedSessionLockedException the build is stopped
523530
* because there may be another build running actively on this index.
@@ -586,9 +593,10 @@ public Map<String, IndexBuildProto.IndexBuildIndexingStamp> queryIndexingStamps(
586593

587594
/**
588595
* Block partly built indexes, preventing continuation.
589-
* Active indexing sessions will check for this block according to {@link IndexingPolicy.Builder#checkIndexingStampFrequencyMilliseconds(long)}.}
590-
* @param id if non null, will be added to the "indexing stamp" as an id/hint for the blocking reason.
591-
* @param ttlSeconds if non null, the block will automatically expire after this value (in seconds).
596+
* Active indexing sessions will check for this block during every iterating transaction, which still requires the caller to
597+
* wait a few seconds before assuming that all indexing had stopped.
598+
* @param id if non-null, will be added to the "indexing stamp" as an id/hint for the blocking reason.
599+
* @param ttlSeconds if non-null, the block will automatically expire after this value (in seconds).
592600
* @return a map of target indexes and their "indexing stamps" after the change.
593601
*/
594602
@API(API.Status.EXPERIMENTAL)
@@ -635,7 +643,7 @@ public Map<UUID, IndexBuildProto.IndexBuildHeartbeat> getIndexingHeartbeats(int
635643
* Clear old indexing heartbeats for a given index (single target or primary index).
636644
* Typically, heartbeats are deleted either at the end of an indexing sessions or when the index becomes readable. This
637645
* cleanup function can be used if, for any reason, the heartbeats could not be deleted from the database at the end of a session.
638-
* @param minAgeMilliseconds minimum heartbeat age (time elapsed since heartbeat creation, in milliseconds) to clear.
646+
* @param minAgeMilliseconds minimum heartbeat age (time elapsed since the last heartbeat, in milliseconds) to clear.
639647
* @param maxIteration safety valve to limit number of items to check. Typically set to zero to keep unlimited
640648
* @return number of cleared heartbeats
641649
*/
@@ -956,7 +964,6 @@ public static class IndexingPolicy {
956964
private final DesiredAction ifReadable;
957965
private final boolean allowUniquePendingState;
958966
private final Set<TakeoverTypes> allowedTakeoverSet;
959-
private final long checkIndexingMethodFrequencyMilliseconds;
960967
private final boolean mutualIndexing;
961968
private final List<Tuple> mutualIndexingBoundaries;
962969
private final boolean allowUnblock;
@@ -1015,7 +1022,6 @@ public enum TakeoverTypes {
10151022
private IndexingPolicy(@Nullable String sourceIndex, @Nullable Object sourceIndexSubspaceKey, boolean forbidRecordScan,
10161023
DesiredAction ifDisabled, DesiredAction ifWriteOnly, DesiredAction ifMismatchPrevious, DesiredAction ifReadable,
10171024
boolean allowUniquePendingState, Set<TakeoverTypes> allowedTakeoverSet,
1018-
long checkIndexingMethodFrequencyMilliseconds,
10191025
boolean mutualIndexing, List<Tuple> mutualIndexingBoundaries,
10201026
boolean allowUnblock, String allowUnblockId,
10211027
long initialMergesCountLimit,
@@ -1029,7 +1035,6 @@ private IndexingPolicy(@Nullable String sourceIndex, @Nullable Object sourceInde
10291035
this.ifReadable = ifReadable;
10301036
this.allowUniquePendingState = allowUniquePendingState;
10311037
this.allowedTakeoverSet = allowedTakeoverSet;
1032-
this.checkIndexingMethodFrequencyMilliseconds = checkIndexingMethodFrequencyMilliseconds;
10331038
this.mutualIndexing = mutualIndexing;
10341039
this.mutualIndexingBoundaries = mutualIndexingBoundaries;
10351040
this.allowUnblock = allowUnblock;
@@ -1101,7 +1106,6 @@ public Builder toBuilder() {
11011106
.setIfReadable(ifReadable)
11021107
.allowUniquePendingState(allowUniquePendingState)
11031108
.allowTakeoverContinue(allowedTakeoverSet)
1104-
.checkIndexingStampFrequencyMilliseconds(checkIndexingMethodFrequencyMilliseconds)
11051109
.setMutualIndexing(mutualIndexing)
11061110
.setMutualIndexingBoundaries(mutualIndexingBoundaries)
11071111
.setAllowUnblock(allowUnblock, allowUnblockId)
@@ -1226,11 +1230,13 @@ public boolean shouldAllowUnblock(String stampBlockId) {
12261230
}
12271231

12281232
/**
1233+
* <em>Deprecated</em> and unused.
12291234
* If negative, avoid checks. Else, minimal interval between checks.
1230-
* @return minmal interval in milliseconds.
1235+
* @return minimal interval in milliseconds.
12311236
*/
1237+
@API(API.Status.DEPRECATED)
12321238
public long getCheckIndexingMethodFrequencyMilliseconds() {
1233-
return this.checkIndexingMethodFrequencyMilliseconds;
1239+
return 0;
12341240
}
12351241

12361242
/**
@@ -1275,7 +1281,6 @@ public static class Builder {
12751281
private DesiredAction ifReadable = DesiredAction.CONTINUE;
12761282
private boolean doAllowUniquePendingState = false;
12771283
private Set<TakeoverTypes> allowedTakeoverSet = null;
1278-
private long checkIndexingStampFrequency = 5_000;
12791284
private boolean useMutualIndexing = false;
12801285
private List<Tuple> useMutualIndexingBoundaries = null;
12811286
private boolean allowUnblock = false;
@@ -1457,6 +1462,7 @@ public Builder allowTakeoverContinue(@Nullable Collection<TakeoverTypes> allowed
14571462
}
14581463

14591464
/**
1465+
* <em>Deprecated</em> - for better consistency, the type stamp will now be validated during every iterating transaction.
14601466
* During indexing, the indexer can check the current indexing stamp and throw an exception if it had changed.
14611467
* This may happen if another indexing type takes over or by an indexing block (see {@link #indexingStamp}).
14621468
* The argument may be:
@@ -1467,8 +1473,9 @@ public Builder allowTakeoverContinue(@Nullable Collection<TakeoverTypes> allowed
14671473
* @param frequency : If negative, avoid checks. Else, minimal interval between checks
14681474
* @return this builder.
14691475
*/
1476+
@API(API.Status.DEPRECATED)
14701477
public Builder checkIndexingStampFrequencyMilliseconds(long frequency) {
1471-
this.checkIndexingStampFrequency = frequency;
1478+
// No-op
14721479
return this;
14731480
}
14741481

@@ -1577,7 +1584,6 @@ public IndexingPolicy build() {
15771584
return new IndexingPolicy(sourceIndex, sourceIndexSubspaceKey, forbidRecordScan,
15781585
ifDisabled, ifWriteOnly, ifMismatchPrevious, ifReadable,
15791586
doAllowUniquePendingState, allowedTakeoverSet,
1580-
checkIndexingStampFrequency,
15811587
useMutualIndexing, useMutualIndexingBoundaries, allowUnblock, allowUnblockId,
15821588
initialMergesCountLimit, reverseScanOrder);
15831589
}

0 commit comments

Comments
 (0)