Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.apple.foundationdb.util.CloseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -100,7 +101,9 @@ protected RecordRepair(@Nonnull final Builder config) {
this.database = config.database;
this.storeBuilder = config.getStoreBuilder();
this.validationKind = config.getValidationKind();
ThrottledRetryingIterator.Builder<Tuple> iteratorBuilder = ThrottledRetryingIterator.builder(database, cursorFactory(), this::handleOneItem);
ThrottledRetryingIterator.Builder<Tuple> iteratorBuilder =
ThrottledRetryingIterator.builder(database, cursorFactory(), this::handleOneItem)
.withMdcContext(MDC.getCopyOfContextMap());
throttledIterator = configureThrottlingIterator(iteratorBuilder, config).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -389,14 +390,6 @@ void init() {
}
}

public static <T> Builder<T> builder(TransactionalRunner runner,
Executor executor,
ScheduledExecutorService scheduledExecutor,
CursorFactory<T> cursorCreator,
ItemHandler<T> singleItemHandler) {
return new Builder<>(runner, executor, scheduledExecutor, cursorCreator, singleItemHandler);
}

public static <T> Builder<T> builder(FDBDatabase database,
CursorFactory<T> cursorCreator,
ItemHandler<T> singleItemHandler) {
Expand All @@ -409,9 +402,13 @@ public static <T> Builder<T> builder(FDBDatabase database,
* @param <T> the item type being iterated on.
*/
public static class Builder<T> {
private final TransactionalRunner transactionalRunner;
private final Executor executor;
private final ScheduledExecutorService scheduledExecutor;
// Fields constructed during build()
private TransactionalRunner transactionalRunner;
private Executor executor;
private ScheduledExecutorService scheduledExecutor;
// Fields initialized by setters/constructor
private FDBDatabase database;
private FDBRecordContextConfig.Builder contextConfigBuilder;
private final CursorFactory<T> cursorCreator;
private final ItemHandler<T> singleItemHandler;
private Consumer<QuotaManager> transactionSuccessNotification;
Expand All @@ -422,17 +419,10 @@ public static class Builder<T> {
private int maxRecordDeletesPerSec;
private int numOfRetries;

/**
* Constructor.
* @param runner the FDB runner to use when creating transactions
* @param cursorCreator the factory to use when creating the inner cursor
* @param singleItemHandler the handler of a single item while iterating
*/
private Builder(TransactionalRunner runner, Executor executor, ScheduledExecutorService scheduledExecutor, CursorFactory<T> cursorCreator, ItemHandler<T> singleItemHandler) {
private Builder(FDBDatabase database, FDBRecordContextConfig.Builder contextConfigBuilder, CursorFactory<T> cursorCreator, ItemHandler<T> singleItemHandler) {
// Mandatory fields are set in the constructor. Everything else is optional.
this.transactionalRunner = runner;
this.executor = executor;
this.scheduledExecutor = scheduledExecutor;
this.database = database;
this.contextConfigBuilder = contextConfigBuilder;
this.cursorCreator = cursorCreator;
this.singleItemHandler = singleItemHandler;
// set defaults
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest below that you remove the other constructor, but if not, perhaps worth settings these ones in the field declaration, and remove the duplication.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the other constructor should have been deleted

Expand All @@ -443,14 +433,6 @@ private Builder(TransactionalRunner runner, Executor executor, ScheduledExecutor
this.numOfRetries = NUMBER_OF_RETRIES;
}

private Builder(FDBDatabase database, FDBRecordContextConfig.Builder contextConfigBuilder, CursorFactory<T> cursorCreator, ItemHandler<T> singleItemHandler) {
this(new TransactionalRunner(database, contextConfigBuilder),
database.newContextExecutor(contextConfigBuilder.getMdcContext()),
database.getScheduledExecutor(),
cursorCreator,
singleItemHandler);
}

/**
* Set the amount of time for each transaction before committing and starting another.
* Defaults to 4000.
Expand Down Expand Up @@ -538,11 +520,27 @@ public Builder<T> withNumOfRetries(int numOfRetries) {
return this;
}

/**
* Set the MDC context for the runner/executor.
* This MDC context will be carried out into the runner and executor and will allow them to pass that down to
* LOGGER calls used by the item handlers.
* Defaults to empty context.
* @param mdcContext the MDC context to use
* @return this builder
*/
public Builder<T> withMdcContext(Map<String, String> mdcContext) {
this.contextConfigBuilder.setMdcContext(mdcContext);
return this;
}

/**
* Create the iterator.
* @return the newly minted iterator
*/
public ThrottledRetryingIterator<T> build() {
this.transactionalRunner = new TransactionalRunner(database, contextConfigBuilder);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the constructor that takes a TransactionalRunner is called, the one provided will not be used here.
That call does not appear to be used, maybe worth just deleting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, deleted (should have been deleted earlier).

this.executor = database.newContextExecutor(contextConfigBuilder.getMdcContext());
this.scheduledExecutor = database.getScheduledExecutor();
return new ThrottledRetryingIterator<>(this);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,15 @@
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.MDC;

import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
Expand All @@ -51,6 +55,7 @@
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand Down Expand Up @@ -718,6 +723,60 @@
Assertions.assertThat(cursor.get().isClosed()).isTrue();
}

private static Stream<Arguments> mdcParams() {
return Stream.of(
Arguments.of("value", 10, 0, 1), // have MDC value, one transaction
Arguments.of(null, 10, 0, 1), // null MDC, one transaction
Arguments.of("value", 10, 6, 2) // have MDC, 2 transactions
);
}

@ParameterizedTest
@MethodSource("mdcParams")
void testMdcContextPropagation(String mdcValue, int numRecords, int deletesPerTransaction, int expectedTransactions) throws Exception {
String mdcKey = "mdckey";
final Map<String, String> original = MDC.getCopyOfContextMap();

try {
// Set MDC context if provided
if (mdcValue != null) {
MDC.clear();
MDC.put(mdcKey, mdcValue);
}
final AtomicInteger transactionCount = new AtomicInteger(0);
final Consumer<ThrottledRetryingIterator.QuotaManager> transactionStart =
quotaManager -> transactionCount.incrementAndGet();

Check warning on line 748 in fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledIteratorTest.java

View check run for this annotation

fdb.teamscale.io / Teamscale | Findings

fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledIteratorTest.java#L747-L748

Move the declaration of "transactionStart" closer to the code that uses it https://fdb.teamscale.io/findings/details/foundationdb-fdb-record-layer?t=FORK_MR%2F3740%2Fohadzeliger%2Frunner-store-mdc%3AHEAD&id=0B9E539F769C4018772A5E53971D7F96
final ItemHandler<Integer> itemHandler = (store, item, quotaManager) -> {
assertThat(MDC.get(mdcKey)).isEqualTo(mdcValue); // covers null case
quotaManager.deleteCountInc();
return AsyncUtil.DONE;
};

final FDBRecordStore.Builder storeBuilder;
try (FDBRecordContext context = openContext()) {
openSimpleRecordStore(context);
storeBuilder = recordStore.asBuilder();
commit(context);
}

Map<String, String> mdcContext = (mdcValue == null) ? null : MDC.getCopyOfContextMap();
ThrottledRetryingIterator.Builder<Integer> builder =
ThrottledRetryingIterator.builder(fdb, intCursor(numRecords, null), itemHandler)
.withMdcContext(mdcContext);

builder.withMaxRecordsDeletesPerTransaction(deletesPerTransaction)
.withTransactionInitNotification(transactionStart);

try (ThrottledRetryingIterator<Integer> throttledIterator = builder.build()) {
throttledIterator.iterateAll(storeBuilder).join();
}

assertThat(transactionCount.get()).isEqualTo(expectedTransactions);
} finally {
MDC.setContextMap(original);
}
}

private ThrottledRetryingIterator.Builder<Integer> iteratorBuilder(final int numRecords,
final ItemHandler<Integer> itemHandler,
final Consumer<ThrottledRetryingIterator.QuotaManager> initNotification,
Expand Down
Loading