Skip to content

Commit e5cbef1

Browse files
committed
Apply (more of) Scott's requested changes
1 parent f9c08b2 commit e5cbef1

File tree

13 files changed

+375
-163
lines changed

13 files changed

+375
-163
lines changed

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/FDBRecordStore.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4965,6 +4965,7 @@ public void removeFormerIndex(FormerIndex formerIndex) {
49654965
}
49664966

49674967
private void clearReadableIndexBuildData(Index index) {
4968+
// Clear index maintenance data that is unneeded once the index becomes readable
49684969
IndexingRangeSet.forIndexBuild(this, index).clear();
49694970
IndexingHeartbeat.clearAllHeartbeats(this, index);
49704971
}

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingBase.java

Lines changed: 32 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ public abstract class IndexingBase {
102102
private final long startingTimeMillis;
103103
private long lastTypeStampCheckMillis;
104104
private Map<String, IndexingMerger> indexingMergerMap = null;
105+
@Nullable
105106
private IndexingHeartbeat heartbeat = null; // this will stay null for index scrubbing
106107

107108
IndexingBase(@Nonnull IndexingCommon common,
@@ -157,15 +158,13 @@ public CompletableFuture<Void> buildIndexAsync(boolean markReadable) {
157158
long startNanos = System.nanoTime();
158159
FDBDatabaseRunner runner = getRunner();
159160
final FDBStoreTimer timer = runner.getTimer();
160-
if ( timer != null) {
161+
if (timer != null) {
161162
lastProgressSnapshot = StoreTimerSnapshot.from(timer);
162163
}
163-
AtomicReference<Throwable> indexingException = new AtomicReference<>(null);
164-
return handleStateAndDoBuildIndexAsync(markReadable, message)
165-
.handle((ret, ex) -> {
166-
if (ex != null) {
167-
indexingException.set(ex);
168-
}
164+
return MoreAsyncUtil.composeWhenComplete(
165+
handleStateAndDoBuildIndexAsync(markReadable, message),
166+
(result, ex) -> {
167+
// proper log
169168
message.addKeysAndValues(indexingLogMessageKeyValues()) // add these here to pick up state accumulated during build
170169
.addKeysAndValues(common.indexLogMessageKeyValues())
171170
.addKeyAndValue(LogMessageKeys.TOTAL_MICROS, TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - startNanos));
@@ -177,20 +176,13 @@ public CompletableFuture<Void> buildIndexAsync(boolean markReadable) {
177176
message.addKeyAndValue(LogMessageKeys.RESULT, "success");
178177
LOGGER.info(message.toString());
179178
}
180-
return ret;
181-
})
182-
// Here: if the heartbeat was *not* cleared while marking the index readable, it would be cleared in
183-
// these dedicated transaction. Heartbeat clearing is not a blocker but a "best effort" operation.
184-
.thenCompose(ignore -> clearHeartbeats())
185-
.handle((ignore, exIgnore) -> {
186-
Throwable ex = indexingException.get();
187-
if (ex instanceof RuntimeException) {
188-
throw (RuntimeException) ex;
189-
} else if (ex != null) {
190-
throw new RuntimeException(ex);
191-
}
192-
return null;
193-
});
179+
// Here: if the heartbeat was *not* cleared while marking the index readable, it would be cleared in
180+
// these dedicated transaction. Heartbeat clearing is not a blocker but a "best effort" operation.
181+
return clearHeartbeats()
182+
.handle((ignoreRet, ignoreEx) -> null);
183+
},
184+
getRunner().getDatabase()::mapAsyncToSyncException
185+
);
194186
}
195187

196188
abstract List<Object> indexingLogMessageKeyValues();
@@ -271,7 +263,8 @@ private CompletableFuture<Void> handleStateAndDoBuildIndexAsync(boolean markRead
271263
doIndex ?
272264
buildIndexInternalAsync().thenApply(ignore -> markReadable) :
273265
AsyncUtil.READY_FALSE
274-
).thenCompose(this::markIndexReadable).thenApply(ignore -> null);
266+
).thenCompose(this::markIndexReadable
267+
).thenApply(ignore -> null);
275268
}
276269

277270
private CompletableFuture<Void> markIndexesWriteOnly(boolean continueBuild, FDBRecordStore store) {
@@ -317,7 +310,7 @@ public CompletableFuture<Boolean> markIndexReadable(boolean markReadablePlease)
317310
// Mark each index readable in its own (retriable, parallel) transaction. If one target fails to become
318311
// readable, it should not affect the others.
319312
return forEachTargetIndex(index ->
320-
markIndexReadableSingleTarget(index, anythingChanged, runtimeExceptionAtomicReference)
313+
markIndexReadableForIndex(index, anythingChanged, runtimeExceptionAtomicReference)
321314
).thenApply(ignore -> {
322315
RuntimeException ex = runtimeExceptionAtomicReference.get();
323316
if (ex != null) {
@@ -328,13 +321,13 @@ public CompletableFuture<Boolean> markIndexReadable(boolean markReadablePlease)
328321
});
329322
}
330323

331-
private CompletableFuture<Boolean> markIndexReadableSingleTarget(Index index, AtomicBoolean anythingChanged,
332-
AtomicReference<RuntimeException> runtimeExceptionAtomicReference) {
324+
private CompletableFuture<Boolean> markIndexReadableForIndex(Index index, AtomicBoolean anythingChanged,
325+
AtomicReference<RuntimeException> runtimeExceptionAtomicReference) {
333326
// An extension function to reduce markIndexReadable's complexity
334327
return getRunner().runAsync(context ->
335328
common.getRecordStoreBuilder().copyBuilder().setContext(context).openAsync()
336329
.thenCompose(store -> {
337-
clearHeartbeatSingleTarget(store, index);
330+
clearHeartbeatForIndex(store, index);
338331
return policy.shouldAllowUniquePendingState(store) ?
339332
store.markIndexReadableOrUniquePending(index) :
340333
store.markIndexReadable(index);
@@ -376,7 +369,7 @@ private CompletableFuture<Void> setIndexingTypeOrThrow(FDBRecordStore store, boo
376369
if (forceStampOverwrite && !continuedBuild) {
377370
// Fresh session + overwrite = no questions asked
378371
store.saveIndexingTypeStamp(index, newStamp);
379-
return AsyncUtil.DONE ;
372+
return AsyncUtil.DONE;
380373
}
381374
return store.loadIndexingTypeStampAsync(index)
382375
.thenCompose(savedStamp -> {
@@ -858,30 +851,27 @@ private CompletableFuture<Void> updateHeartbeat(boolean validate, FDBRecordStore
858851

859852
private CompletableFuture<Void> clearHeartbeats() {
860853
if (heartbeat == null) {
854+
// Here: either silent heartbeats or heartbeats had been cleared during markReadable phase
861855
return AsyncUtil.DONE;
862856
}
863-
return forEachTargetIndex(this::clearHeartbeatSingleTarget)
864-
.thenAccept(ignore -> heartbeat = null);
857+
// Here: for each index we clear (only) the heartbeat generated by this indexer. This is a quick operation that can be done in a single transaction.
858+
return getRunner().runAsync(context ->
859+
common.getRecordStoreBuilder().copyBuilder().setContext(context).openAsync()
860+
.thenApply(store -> {
861+
clearHeartbeats(store);
862+
return null;
863+
}));
865864
}
866865

867866
private void clearHeartbeats(FDBRecordStore store) {
868867
if (heartbeat != null) {
869868
for (Index index : common.getTargetIndexes()) {
870-
clearHeartbeatSingleTarget(store, index);
869+
heartbeat.clearHeartbeat(store, index);
871870
}
872871
}
873872
}
874873

875-
private CompletableFuture<Void> clearHeartbeatSingleTarget(Index index) {
876-
return getRunner().runAsync(context ->
877-
common.getRecordStoreBuilder().copyBuilder().setContext(context).openAsync()
878-
.thenApply(store -> {
879-
clearHeartbeatSingleTarget(store, index);
880-
return null;
881-
}));
882-
}
883-
884-
private void clearHeartbeatSingleTarget(FDBRecordStore store, Index index) {
874+
private void clearHeartbeatForIndex(FDBRecordStore store, Index index) {
885875
if (heartbeat != null) {
886876
heartbeat.clearHeartbeat(store, index);
887877
}
@@ -1115,9 +1105,9 @@ public CompletableFuture<Map<UUID, IndexBuildProto.IndexBuildHeartbeat>> getInde
11151105
.thenCompose(store -> IndexingHeartbeat.getIndexingHeartbeats(store, common.getPrimaryIndex(), maxCount)));
11161106
}
11171107

1118-
public CompletableFuture<Integer> clearIndexingHeartbeats(long minAgenMilliseconds, int maxIteration) {
1108+
public CompletableFuture<Integer> clearIndexingHeartbeats(long minAgeMilliseconds, int maxIteration) {
11191109
return getRunner().runAsync(context -> openRecordStore(context)
1120-
.thenCompose(store -> IndexingHeartbeat.clearIndexingHeartbeats(store, common.getPrimaryIndex(), minAgenMilliseconds, maxIteration)));
1110+
.thenCompose(store -> IndexingHeartbeat.clearIndexingHeartbeats(store, common.getPrimaryIndex(), minAgeMilliseconds, maxIteration)));
11211111
}
11221112

11231113
/**

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingHeartbeat.java

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,17 @@
2121
package com.apple.foundationdb.record.provider.foundationdb;
2222

2323
import com.apple.foundationdb.KeyValue;
24+
import com.apple.foundationdb.annotation.API;
2425
import com.apple.foundationdb.async.AsyncIterator;
2526
import com.apple.foundationdb.async.AsyncUtil;
2627
import com.apple.foundationdb.record.IndexBuildProto;
28+
import com.apple.foundationdb.record.logging.KeyValueLogMessage;
2729
import com.apple.foundationdb.record.logging.LogMessageKeys;
2830
import com.apple.foundationdb.record.metadata.Index;
2931
import com.apple.foundationdb.synchronizedsession.SynchronizedSessionLockedException;
3032
import com.google.protobuf.InvalidProtocolBufferException;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
3135

3236
import javax.annotation.Nonnull;
3337
import java.util.HashMap;
@@ -36,8 +40,20 @@
3640
import java.util.concurrent.CompletableFuture;
3741
import java.util.concurrent.atomic.AtomicInteger;
3842

43+
/**
44+
* Indexing Shared Heartbeats can be used to define and handle "active" indexing processes.
45+
* Every indexer should update its unique heartbeat during its indexing iteration. If the indexing session is optimized for
46+
* non-mutual (as defined by the indexing type, see {@link IndexBuildProto.IndexBuildIndexingStamp}), detecting an existing
47+
* active heartbeat will help preventing concurrent, conflicting, indexing attempts.
48+
* In addition, the heartbeats can be used by users to query activity status of ongoing indexing sessions.
49+
*/
50+
@API(API.Status.INTERNAL)
3951
public class IndexingHeartbeat {
4052
// [prefix, indexerId] -> [indexing-type, genesis time, heartbeat time]
53+
@Nonnull
54+
private static final Logger logger = LoggerFactory.getLogger(IndexingHeartbeat.class);
55+
public static final String INVALID_HEARTBEAT_INFO = "<< Invalid Heartbeat >>";
56+
4157
final UUID indexerId;
4258
final String info;
4359
final long genesisTimeMilliseconds;
@@ -53,7 +69,7 @@ public IndexingHeartbeat(final UUID indexerId, String info, long leaseLength, bo
5369
}
5470

5571
public void updateHeartbeat(@Nonnull FDBRecordStore store, @Nonnull Index index) {
56-
byte[] key = IndexingSubspaces.indexheartbeatSubspace(store, index, indexerId).pack();
72+
byte[] key = IndexingSubspaces.indexHeartbeatSubspace(store, index, indexerId).pack();
5773
byte[] value = IndexBuildProto.IndexBuildHeartbeat.newBuilder()
5874
.setInfo(info)
5975
.setGenesisTimeMilliseconds(genesisTimeMilliseconds)
@@ -92,7 +108,11 @@ public CompletableFuture<Void> checkAndUpdateHeartbeat(@Nonnull FDBRecordStore s
92108
}
93109
}
94110
} catch (InvalidProtocolBufferException e) {
95-
throw new RuntimeException(e);
111+
if (logger.isWarnEnabled()) {
112+
logger.warn(KeyValueLogMessage.of("Bad indexing heartbeat item",
113+
LogMessageKeys.KEY, kv.getKey(),
114+
LogMessageKeys.VALUE, kv.getValue()));
115+
}
96116
}
97117
return true;
98118
}))
@@ -103,11 +123,11 @@ public CompletableFuture<Void> checkAndUpdateHeartbeat(@Nonnull FDBRecordStore s
103123
}
104124

105125
public void clearHeartbeat(@Nonnull FDBRecordStore store, @Nonnull Index index) {
106-
store.ensureContextActive().clear(IndexingSubspaces.indexheartbeatSubspace(store, index, indexerId).pack());
126+
store.ensureContextActive().clear(IndexingSubspaces.indexHeartbeatSubspace(store, index, indexerId).pack());
107127
}
108128

109129
public static void clearAllHeartbeats(@Nonnull FDBRecordStore store, @Nonnull Index index) {
110-
store.ensureContextActive().clear(IndexingSubspaces.indexheartbeatSubspace(store, index).range());
130+
store.ensureContextActive().clear(IndexingSubspaces.indexHeartbeatSubspace(store, index).range());
111131
}
112132

113133
public static CompletableFuture<Map<UUID, IndexBuildProto.IndexBuildHeartbeat>> getIndexingHeartbeats(FDBRecordStore store, Index index, int maxCount) {
@@ -130,15 +150,17 @@ public static CompletableFuture<Map<UUID, IndexBuildProto.IndexBuildHeartbeat>>
130150
} catch (InvalidProtocolBufferException e) {
131151
// Let the caller know about this invalid heartbeat.
132152
ret.put(otherIndexerId, IndexBuildProto.IndexBuildHeartbeat.newBuilder()
133-
.setInfo("<< Invalid Heartbeat >>")
153+
.setInfo(INVALID_HEARTBEAT_INFO)
154+
.setGenesisTimeMilliseconds(0)
155+
.setHeartbeatTimeMilliseconds(0)
134156
.build());
135157
}
136158
return true;
137159
}))
138160
.thenApply(ignore -> ret);
139161
}
140162

141-
public static CompletableFuture<Integer> clearIndexingHeartbeats(@Nonnull FDBRecordStore store, @Nonnull Index index, long minAgenMilliseconds, int maxIteration) {
163+
public static CompletableFuture<Integer> clearIndexingHeartbeats(@Nonnull FDBRecordStore store, @Nonnull Index index, long minAgeMilliseconds, int maxIteration) {
142164
final AsyncIterator<KeyValue> iterator = heartbeatsIterator(store, index);
143165
final AtomicInteger deleteCount = new AtomicInteger(0);
144166
final AtomicInteger iterationCount = new AtomicInteger(0);
@@ -156,7 +178,7 @@ public static CompletableFuture<Integer> clearIndexingHeartbeats(@Nonnull FDBRec
156178
try {
157179
final IndexBuildProto.IndexBuildHeartbeat otherHeartbeat = IndexBuildProto.IndexBuildHeartbeat.parseFrom(kv.getValue());
158180
// remove heartbeat if too old
159-
shouldRemove = now + minAgenMilliseconds >= otherHeartbeat.getHeartbeatTimeMilliseconds();
181+
shouldRemove = now >= otherHeartbeat.getHeartbeatTimeMilliseconds() + minAgeMilliseconds;
160182
} catch (InvalidProtocolBufferException e) {
161183
// remove heartbeat if invalid
162184
shouldRemove = true;
@@ -171,11 +193,11 @@ public static CompletableFuture<Integer> clearIndexingHeartbeats(@Nonnull FDBRec
171193
}
172194

173195
private static AsyncIterator<KeyValue> heartbeatsIterator(FDBRecordStore store, Index index) {
174-
return store.getContext().ensureActive().snapshot().getRange(IndexingSubspaces.indexheartbeatSubspace(store, index).range()).iterator();
196+
return store.getContext().ensureActive().getRange(IndexingSubspaces.indexHeartbeatSubspace(store, index).range()).iterator();
175197
}
176198

177199
private static UUID heartbeatKeyToIndexerId(FDBRecordStore store, Index index, byte[] key) {
178-
return IndexingSubspaces.indexheartbeatSubspace(store, index).unpack(key).getUUID(0);
200+
return IndexingSubspaces.indexHeartbeatSubspace(store, index).unpack(key).getUUID(0);
179201
}
180202

181203
private static long nowMilliseconds() {

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingSubspaces.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public static Subspace indexBuildTypeSubspace(@Nonnull FDBRecordStoreBase<?> sto
9292
* @return subspace
9393
*/
9494
@Nonnull
95-
public static Subspace indexheartbeatSubspace(@Nonnull FDBRecordStoreBase<?> store, @Nonnull Index index) {
95+
public static Subspace indexHeartbeatSubspace(@Nonnull FDBRecordStoreBase<?> store, @Nonnull Index index) {
9696
return indexBuildSubspace(store, index, INDEX_BUILD_HEARTBEAT_PREFIX);
9797
}
9898

@@ -104,8 +104,8 @@ public static Subspace indexheartbeatSubspace(@Nonnull FDBRecordStoreBase<?> sto
104104
* @return subspace
105105
*/
106106
@Nonnull
107-
public static Subspace indexheartbeatSubspace(@Nonnull FDBRecordStoreBase<?> store, @Nonnull Index index, @Nonnull UUID indexerId) {
108-
return indexheartbeatSubspace(store, index).subspace(Tuple.from(indexerId));
107+
public static Subspace indexHeartbeatSubspace(@Nonnull FDBRecordStoreBase<?> store, @Nonnull Index index, @Nonnull UUID indexerId) {
108+
return indexHeartbeatSubspace(store, index).subspace(Tuple.from(indexerId));
109109
}
110110

111111
/**
@@ -210,6 +210,6 @@ public static void eraseAllIndexingDataButTheLock(@Nonnull FDBRecordContext cont
210210
context.clear(Range.startsWith(indexBuildScannedRecordsSubspace(store, index).pack()));
211211
context.clear(Range.startsWith(indexBuildTypeSubspace(store, index).pack()));
212212
// The heartbeats, unlike the sync lock, may be erased here. If needed, an appropriate heartbeat will be set after this clear & within the same transaction.
213-
context.clear(Range.startsWith(indexheartbeatSubspace(store, index).pack()));
213+
context.clear(Range.startsWith(indexHeartbeatSubspace(store, index).pack()));
214214
}
215215
}

0 commit comments

Comments
 (0)