Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,7 @@ int deleteDocument(Tuple groupingKey, Integer partitionId, Tuple primaryKey) thr
@Nullable final LucenePrimaryKeySegmentIndex segmentIndex = directoryManager.getDirectory(groupingKey, partitionId).getPrimaryKeySegmentIndex();

if (segmentIndex != null) {
final DirectoryReader directoryReader = directoryManager.getWriterReader(groupingKey, partitionId);
final LucenePrimaryKeySegmentIndex.DocumentIndexEntry documentIndexEntry = segmentIndex.findDocument(directoryReader, primaryKey);
final LucenePrimaryKeySegmentIndex.DocumentIndexEntry documentIndexEntry = getDocumentIndexEntryWithRetry(segmentIndex, groupingKey, partitionId, primaryKey);
if (documentIndexEntry != null) {
state.context.ensureActive().clear(documentIndexEntry.entryKey); // TODO: Only if valid?
long valid = indexWriter.tryDeleteDocument(documentIndexEntry.indexReader, documentIndexEntry.docId);
Expand Down Expand Up @@ -360,6 +359,32 @@ int deleteDocument(Tuple groupingKey, Integer partitionId, Tuple primaryKey) thr
return 0;
}

/**
* Try to find the document for the given record in the segment index.
* This method would first try to find the document using the existing reader. If it can't, it will refresh the reader
* and try again. The incentive for this is when the documents have been updated in memory (e.g. in the same transaction), the
* writer may cache the changes in NRT and the reader (created before the updates) can't see them. Refreshing the reader from the
* writer can alleviate this by re-reading the changes in the NRT.
* If the index can't find the document with the refreshed reader, null is returned.
* @param groupingKey the grouping key for the index
* @param partitionId the partition ID for the index
* @param primaryKey the record primary key to look for
* @return segment index entry if the record was found, null if none
* @throws IOException in case of error
*/
@SuppressWarnings("PMD.CloseResource")
private LucenePrimaryKeySegmentIndex.DocumentIndexEntry getDocumentIndexEntryWithRetry(LucenePrimaryKeySegmentIndex segmentIndex, final Tuple groupingKey, final Integer partitionId, final Tuple primaryKey) throws IOException {
DirectoryReader directoryReader = directoryManager.getWriterReader(groupingKey, partitionId, false);
LucenePrimaryKeySegmentIndex.DocumentIndexEntry documentIndexEntry = segmentIndex.findDocument(directoryReader, primaryKey);
if (documentIndexEntry != null) {
return documentIndexEntry;
} else {
// Use refresh to ensure the reader can see the latest deletes
directoryReader = directoryManager.getWriterReader(groupingKey, partitionId, true);
return segmentIndex.findDocument(directoryReader, primaryKey);
}
}

@Override
public CompletableFuture<Void> mergeIndex() {
return rebalancePartitions()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import com.apple.foundationdb.subspace.Subspace;
import com.apple.foundationdb.tuple.Tuple;
import com.apple.foundationdb.tuple.TupleHelpers;
import com.google.common.annotations.VisibleForTesting;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
Expand Down Expand Up @@ -293,7 +294,8 @@ public void invalidatePrefix(@Nonnull Tuple prefix) {
}
}

private FDBDirectoryWrapper getDirectoryWrapper(@Nullable Tuple groupingKey, @Nullable Integer partitionId) {
@VisibleForTesting
public FDBDirectoryWrapper getDirectoryWrapper(@Nullable Tuple groupingKey, @Nullable Integer partitionId) {
return getDirectoryWrapper(groupingKey, partitionId, getAgilityContext(false, false));
}

Expand Down Expand Up @@ -368,7 +370,11 @@ public IndexWriter getIndexWriter(@Nullable Tuple groupingKey, @Nullable Integer
}

public DirectoryReader getWriterReader(@Nullable Tuple groupingKey, @Nullable Integer partititonId) throws IOException {
return getDirectoryWrapper(groupingKey, partititonId).getWriterReader();
return getWriterReader(groupingKey, partititonId, false);
}

public DirectoryReader getWriterReader(@Nullable Tuple groupingKey, @Nullable Integer partititonId, boolean refresh) throws IOException {
return getDirectoryWrapper(groupingKey, partititonId).getWriterReader(refresh);
}

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package com.apple.foundationdb.record.lucene.directory;

import com.apple.foundationdb.annotation.API;
import com.apple.foundationdb.record.lucene.LuceneAnalyzerWrapper;
import com.apple.foundationdb.record.lucene.LuceneEvents;
import com.apple.foundationdb.record.lucene.LuceneLoggerInfoStream;
Expand All @@ -30,6 +31,8 @@
import com.apple.foundationdb.record.provider.foundationdb.IndexMaintainerState;
import com.apple.foundationdb.subspace.Subspace;
import com.apple.foundationdb.tuple.Tuple;
import com.apple.foundationdb.util.CloseException;
import com.apple.foundationdb.util.CloseableUtils;
import com.google.common.annotations.VisibleForTesting;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.index.ConcurrentMergeScheduler;
Expand All @@ -50,6 +53,8 @@
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ThreadLocalRandom;

/**
Expand All @@ -58,6 +63,7 @@
* {@link FDBDirectory} contains cached information from FDB, it is important for cache coherency that all writers
* (etc.) accessing that directory go through the same wrapper object so that they share a common cache.
*/
@API(API.Status.INTERNAL)
public class FDBDirectoryWrapper implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(FDBDirectoryWrapper.class);

Expand Down Expand Up @@ -91,8 +97,13 @@ public class FDBDirectoryWrapper implements AutoCloseable {
* predominately used by the {@link com.apple.foundationdb.record.lucene.LucenePrimaryKeySegmentIndex} to find the
* segments associated with documents being deleted.
*/
private final LazyCloseable<DirectoryReader> writerReader;

private LazyCloseable<DirectoryReader> writerReader;
/**
* WriterReaders that were replaced (through {@link #getWriterReader(boolean)} )} with a {@code refresh==true}).
* These readers should all be closed, but they may still be in use while this class is in circulation, so their
* closure is postponed until this class' {@link #close()} call.
*/
private Queue<LazyCloseable<DirectoryReader>> readersToClose;

FDBDirectoryWrapper(@Nonnull final IndexMaintainerState state,
@Nonnull final Tuple key,
Expand All @@ -109,6 +120,7 @@ public class FDBDirectoryWrapper implements AutoCloseable {
this.analyzerWrapper = analyzerWrapper;
writer = LazyCloseable.supply(() -> createIndexWriter(exceptionAtCreation));
writerReader = LazyCloseable.supply(() -> DirectoryReader.open(writer.get()));
readersToClose = new ConcurrentLinkedQueue<>();
}

@VisibleForTesting
Expand Down Expand Up @@ -202,9 +214,18 @@ public IndexReader getReader() throws IOException {
/**
* Get a {@link DirectoryReader} wrapped around the {@link #getWriter()} to be able to get segments associated with
* documents. This resource will be closed when {@code this} is closed, and should not be closed by callers
* @param refresh if TRUE will try to refresh the reader data from the writer
*/
@SuppressWarnings("PMD.CloseResource")
public DirectoryReader getWriterReader() throws IOException {
public DirectoryReader getWriterReader(boolean refresh) throws IOException {
if (refresh) {
final DirectoryReader newReader = DirectoryReader.openIfChanged(writerReader.get());
if (newReader != null) {
// previous reader instantiated but then writer changed
readersToClose.add(writerReader);
writerReader = LazyCloseable.supply(() -> newReader);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if this is touched concurrently?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added test to update the readers concurrently

}
}
return writerReader.get();
}

Expand Down Expand Up @@ -359,9 +380,19 @@ public IndexWriter getWriter() throws IOException {
@SuppressWarnings("PMD.CloseResource")
public synchronized void close() throws IOException {
IOUtils.close(writer, writerReader, directory);
try {
CloseableUtils.closeAll(readersToClose.toArray(new LazyCloseable<?>[0]));
} catch (CloseException e) {
throw new IOException(e);
}
}

public void mergeIndex() throws IOException {
getWriter().maybeMerge();
}

@VisibleForTesting
public Queue<LazyCloseable<DirectoryReader>> getReadersToClose() {
return readersToClose;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@
import com.apple.foundationdb.record.TestRecordsTextProto;
import com.apple.foundationdb.record.logging.KeyValueLogMessage;
import com.apple.foundationdb.record.logging.LogMessageKeys;
import com.apple.foundationdb.record.lucene.codec.LazyCloseable;
import com.apple.foundationdb.record.lucene.directory.AgilityContext;
import com.apple.foundationdb.record.lucene.directory.FDBDirectory;
import com.apple.foundationdb.record.lucene.directory.FDBDirectoryLockFactory;
import com.apple.foundationdb.record.lucene.directory.FDBDirectoryManager;
import com.apple.foundationdb.record.lucene.directory.FDBDirectoryWrapper;
import com.apple.foundationdb.record.metadata.Index;
import com.apple.foundationdb.record.provider.common.FixedZeroKeyManager;
Expand Down Expand Up @@ -60,6 +62,9 @@
import com.apple.test.SuperSlow;
import com.apple.test.Tags;
import com.apple.test.TestConfigurationUtils;
import com.google.common.collect.Streams;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.store.Lock;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assertions;
Expand All @@ -78,18 +83,21 @@
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
Expand Down Expand Up @@ -123,6 +131,7 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
* Tests of the consistency of the Lucene Index.
Expand Down Expand Up @@ -1099,6 +1108,124 @@
assertThat(partitionCounts, contains(0)));
}

static Stream<Arguments> multiUpdate() {
Stream<Long> seeds = Streams.concat(
Stream.of(5365L),
TestConfigurationUtils.onlyNightly(RandomizedTestUtils.randomSeeds(6664, 76778)));
return ParameterizedTestUtils.cartesianProduct(
ParameterizedTestUtils.booleans("isSynthetic"),
ParameterizedTestUtils.booleans("isGrouped"),
Stream.of(0, 10),
Stream.of(0, 1, 4),
seeds);
}

@ParameterizedTest
@MethodSource("multiUpdate")
void multipleUpdatesInTransaction(boolean isSynthetic, boolean isGrouped, int highWatermark, int updateCount, long seed) throws IOException {
final LuceneIndexTestDataModel dataModel = new LuceneIndexTestDataModel.Builder(seed, this::getStoreBuilder, pathManager)
.setIsGrouped(isGrouped)
.setIsSynthetic(isSynthetic)
.setPrimaryKeySegmentIndexEnabled(true)
.setPartitionHighWatermark(highWatermark)
.build();

final int documentCount = 10 + dataModel.getRandom().nextInt(10);

final RecordLayerPropertyStorage contextProps = RecordLayerPropertyStorage.newBuilder()
.addProp(LuceneRecordContextProperties.LUCENE_REPARTITION_DOCUMENT_COUNT, 2)
.addProp(LuceneRecordContextProperties.LUCENE_MERGE_SEGMENTS_PER_TIER, (double)dataModel.nextInt(10) + 2) // it must be at least 2.0
.build();

// save records
try (FDBRecordContext context = openContext(contextProps)) {
dataModel.saveRecordsToAllGroups(documentCount, context);
commit(context);
}

try (FDBRecordContext context = openContext(contextProps)) {
final FDBRecordStore store = dataModel.createOrOpenRecordStore(context);
dataModel.sampleRecordsUnderTest().forEach(rec -> {
for (int i = 0; i < updateCount; i++) {
// update some documents multiple times
rec.updateOtherValue(store).join();
}
});
commit(context);
}
explicitMergeIndex(dataModel.index, contextProps, dataModel.schemaSetup);

if (highWatermark > 0) {
// ensure each partition has all records
dataModel.getPartitionCounts(() -> openContext(contextProps)).forEach((groupingKey, partitionCounts) ->
assertThat(partitionCounts.stream().mapToInt(i -> i).sum(), Matchers.equalTo(documentCount)));
}

explicitMergeIndex(dataModel.index, contextProps, dataModel.schemaSetup);
dataModel.validate(() -> openContext(contextProps));
}

/**
* Test that the DirectoryWrapper accounts for all the created WriterReaders.
* Create multiple ReaderWriters in concurrent threads and ensure they are all closed.
*/
@Test
void testCreateReadersConcurrently() throws IOException {
final LuceneIndexTestDataModel dataModel = new LuceneIndexTestDataModel.Builder(1, this::getStoreBuilder, pathManager)
.setIsGrouped(true)
.setIsSynthetic(true)
.setPrimaryKeySegmentIndexEnabled(true)
.setPartitionHighWatermark(0) // no partitioning, so we only have one partition to work with
.build();
final RecordLayerPropertyStorage contextProps = RecordLayerPropertyStorage.newBuilder()
.addProp(LuceneRecordContextProperties.LUCENE_REPARTITION_DOCUMENT_COUNT, 2)
.addProp(LuceneRecordContextProperties.LUCENE_MERGE_SEGMENTS_PER_TIER, (double)dataModel.nextInt(10) + 2) // it must be at least 2.0
.build();

Queue<IndexReader> createdReaders = new ConcurrentLinkedQueue<>();
Set<DirectoryReader> actualReaders;
Tuple groupingKey = Tuple.from(1);
Integer partitionId = null;
int threads = 10;
int loops = 10;
try (FDBRecordContext context = openContext(contextProps)) {
final FDBRecordStore store = dataModel.createOrOpenRecordStore(context);
final LuceneIndexMaintainer indexMaintainer = getIndexMaintainer(store, dataModel.index);
final FDBDirectoryManager directoryManager = indexMaintainer.getDirectoryManager();

List<CompletableFuture<Void>> futures = new ArrayList<>(threads);
for (int i = 0; i < threads; i++) {
futures.add(CompletableFuture.supplyAsync(() -> {
try {
for (int j = 0; j < loops; j++) {
// Cause the reader to become stale
dataModel.saveRecord(store, 1);
// Refresh the reader - this should create a new one
DirectoryReader writerReader = directoryManager.getWriterReader(groupingKey, partitionId, true);
// Store the created reader for later
createdReaders.add(writerReader);

Check warning on line 1206 in fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java

View check run for this annotation

fdb.teamscale.io / Teamscale | Findings

fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java#L1202-L1206

[New] This method is slightly nested [0]. Consider extracting helper methods or reducing the nesting by using early breaks or returns. [0] https://fdb.teamscale.io/findings/details/foundationdb-fdb-record-layer?t=FORK_MR%2F3722%2Fohadzeliger%2Flucene-multiple-updates%3AHEAD&id=813C358A7B9A26998561F5EED7F773E4
}
return null;
} catch (IOException e) {
throw new RuntimeException(e);
}
}, context.getExecutor()));

Check warning on line 1212 in fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java

View check run for this annotation

fdb.teamscale.io / Teamscale | Findings

fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java#L1198-L1212

[New] Method always returns the same value (null) https://fdb.teamscale.io/findings/details/foundationdb-fdb-record-layer?t=FORK_MR%2F3722%2Fohadzeliger%2Flucene-multiple-updates%3AHEAD&id=2400A78DD8110AB9A55E2259C519D57D
}
AsyncUtil.getAll(futures).join();
Stream<DirectoryReader> stream = directoryManager.getDirectoryWrapper(groupingKey, partitionId).getReadersToClose()
.stream().map(LazyCloseable::getUnchecked);
// add the existing reader (that is not at the readersToClose)
stream = Streams.concat(stream,
Stream.of(directoryManager.getWriterReader(groupingKey, partitionId)));
actualReaders = stream.collect(Collectors.toSet());
commit(context);
}

// assert that each captured reader is included in the directory wrapper's list
createdReaders.forEach(createdReader ->
assertTrue(actualReaders.contains(createdReader)));
}

static Stream<Arguments> changingEncryptionKey() {
return Stream.concat(Stream.of(Arguments.of(true, true, 288513),
Arguments.of(false, false, 792025)),
Expand Down Expand Up @@ -1584,4 +1711,9 @@
private <T> Consumer<T> noopConsumer() {
return ignored -> { };
}

@Nonnull
protected LuceneIndexMaintainer getIndexMaintainer(FDBRecordStore store, Index index) {
return (LuceneIndexMaintainer)store.getIndexMaintainer(index);
}
}
Loading