diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordRepair.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordRepair.java index 7ca244d8c2..3e114200fd 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordRepair.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordRepair.java @@ -33,6 +33,7 @@ import com.apple.foundationdb.util.CloseException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.MDC; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -100,7 +101,9 @@ protected RecordRepair(@Nonnull final Builder config) { this.database = config.database; this.storeBuilder = config.getStoreBuilder(); this.validationKind = config.getValidationKind(); - ThrottledRetryingIterator.Builder iteratorBuilder = ThrottledRetryingIterator.builder(database, cursorFactory(), this::handleOneItem); + ThrottledRetryingIterator.Builder iteratorBuilder = + ThrottledRetryingIterator.builder(database, cursorFactory(), this::handleOneItem) + .withMdcContext(MDC.getCopyOfContextMap()); throttledIterator = configureThrottlingIterator(iteratorBuilder, config).build(); } diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledRetryingIterator.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledRetryingIterator.java index b47e2ba554..66ed060a4d 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledRetryingIterator.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledRetryingIterator.java @@ -43,6 +43,7 @@ import javax.annotation.Nullable; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; @@ -389,14 +390,6 @@ void init() { } } - public static Builder builder(TransactionalRunner runner, - Executor executor, - ScheduledExecutorService scheduledExecutor, - CursorFactory cursorCreator, - ItemHandler singleItemHandler) { - return new Builder<>(runner, executor, scheduledExecutor, cursorCreator, singleItemHandler); - } - public static Builder builder(FDBDatabase database, CursorFactory cursorCreator, ItemHandler singleItemHandler) { @@ -409,9 +402,13 @@ public static Builder builder(FDBDatabase database, * @param the item type being iterated on. */ public static class Builder { - private final TransactionalRunner transactionalRunner; - private final Executor executor; - private final ScheduledExecutorService scheduledExecutor; + // Fields constructed during build() + private TransactionalRunner transactionalRunner; + private Executor executor; + private ScheduledExecutorService scheduledExecutor; + // Fields initialized by setters/constructor + private FDBDatabase database; + private FDBRecordContextConfig.Builder contextConfigBuilder; private final CursorFactory cursorCreator; private final ItemHandler singleItemHandler; private Consumer transactionSuccessNotification; @@ -422,17 +419,10 @@ public static class Builder { private int maxRecordDeletesPerSec; private int numOfRetries; - /** - * Constructor. - * @param runner the FDB runner to use when creating transactions - * @param cursorCreator the factory to use when creating the inner cursor - * @param singleItemHandler the handler of a single item while iterating - */ - private Builder(TransactionalRunner runner, Executor executor, ScheduledExecutorService scheduledExecutor, CursorFactory cursorCreator, ItemHandler singleItemHandler) { + private Builder(FDBDatabase database, FDBRecordContextConfig.Builder contextConfigBuilder, CursorFactory cursorCreator, ItemHandler singleItemHandler) { // Mandatory fields are set in the constructor. Everything else is optional. - this.transactionalRunner = runner; - this.executor = executor; - this.scheduledExecutor = scheduledExecutor; + this.database = database; + this.contextConfigBuilder = contextConfigBuilder; this.cursorCreator = cursorCreator; this.singleItemHandler = singleItemHandler; // set defaults @@ -443,14 +433,6 @@ private Builder(TransactionalRunner runner, Executor executor, ScheduledExecutor this.numOfRetries = NUMBER_OF_RETRIES; } - private Builder(FDBDatabase database, FDBRecordContextConfig.Builder contextConfigBuilder, CursorFactory cursorCreator, ItemHandler singleItemHandler) { - this(new TransactionalRunner(database, contextConfigBuilder), - database.newContextExecutor(contextConfigBuilder.getMdcContext()), - database.getScheduledExecutor(), - cursorCreator, - singleItemHandler); - } - /** * Set the amount of time for each transaction before committing and starting another. * Defaults to 4000. @@ -538,11 +520,27 @@ public Builder withNumOfRetries(int numOfRetries) { return this; } + /** + * Set the MDC context for the runner/executor. + * This MDC context will be carried out into the runner and executor and will allow them to pass that down to + * LOGGER calls used by the item handlers. + * Defaults to empty context. + * @param mdcContext the MDC context to use + * @return this builder + */ + public Builder withMdcContext(Map mdcContext) { + this.contextConfigBuilder.setMdcContext(mdcContext); + return this; + } + /** * Create the iterator. * @return the newly minted iterator */ public ThrottledRetryingIterator build() { + this.transactionalRunner = new TransactionalRunner(database, contextConfigBuilder); + this.executor = database.newContextExecutor(contextConfigBuilder.getMdcContext()); + this.scheduledExecutor = database.getScheduledExecutor(); return new ThrottledRetryingIterator<>(this); } } diff --git a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledIteratorTest.java b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledIteratorTest.java index aae40a62f0..ab913268f0 100644 --- a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledIteratorTest.java +++ b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledIteratorTest.java @@ -36,11 +36,15 @@ import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.MethodSource; +import org.slf4j.MDC; import javax.annotation.Nonnull; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.Semaphore; @@ -51,6 +55,7 @@ import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -718,6 +723,60 @@ void testIteratorClosesOnNextCloses() throws Exception { Assertions.assertThat(cursor.get().isClosed()).isTrue(); } + private static Stream mdcParams() { + return Stream.of( + Arguments.of("value", 10, 0, 1), // have MDC value, one transaction + Arguments.of(null, 10, 0, 1), // null MDC, one transaction + Arguments.of("value", 10, 6, 2) // have MDC, 2 transactions + ); + } + + @ParameterizedTest + @MethodSource("mdcParams") + void testMdcContextPropagation(String mdcValue, int numRecords, int deletesPerTransaction, int expectedTransactions) throws Exception { + String mdcKey = "mdckey"; + final Map original = MDC.getCopyOfContextMap(); + + try { + // Set MDC context if provided + if (mdcValue != null) { + MDC.clear(); + MDC.put(mdcKey, mdcValue); + } + final AtomicInteger transactionCount = new AtomicInteger(0); + final Consumer transactionStart = + quotaManager -> transactionCount.incrementAndGet(); + final ItemHandler itemHandler = (store, item, quotaManager) -> { + assertThat(MDC.get(mdcKey)).isEqualTo(mdcValue); // covers null case + quotaManager.deleteCountInc(); + return AsyncUtil.DONE; + }; + + final FDBRecordStore.Builder storeBuilder; + try (FDBRecordContext context = openContext()) { + openSimpleRecordStore(context); + storeBuilder = recordStore.asBuilder(); + commit(context); + } + + Map mdcContext = (mdcValue == null) ? null : MDC.getCopyOfContextMap(); + ThrottledRetryingIterator.Builder builder = + ThrottledRetryingIterator.builder(fdb, intCursor(numRecords, null), itemHandler) + .withMdcContext(mdcContext); + + builder.withMaxRecordsDeletesPerTransaction(deletesPerTransaction) + .withTransactionInitNotification(transactionStart); + + try (ThrottledRetryingIterator throttledIterator = builder.build()) { + throttledIterator.iterateAll(storeBuilder).join(); + } + + assertThat(transactionCount.get()).isEqualTo(expectedTransactions); + } finally { + MDC.setContextMap(original); + } + } + private ThrottledRetryingIterator.Builder iteratorBuilder(final int numRecords, final ItemHandler itemHandler, final Consumer initNotification,