diff --git a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintainer.java b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintainer.java index 052276254f..23b105a4a8 100644 --- a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintainer.java +++ b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintainer.java @@ -308,8 +308,7 @@ int deleteDocument(Tuple groupingKey, Integer partitionId, Tuple primaryKey) thr @Nullable final LucenePrimaryKeySegmentIndex segmentIndex = directoryManager.getDirectory(groupingKey, partitionId).getPrimaryKeySegmentIndex(); if (segmentIndex != null) { - final DirectoryReader directoryReader = directoryManager.getWriterReader(groupingKey, partitionId); - final LucenePrimaryKeySegmentIndex.DocumentIndexEntry documentIndexEntry = segmentIndex.findDocument(directoryReader, primaryKey); + final LucenePrimaryKeySegmentIndex.DocumentIndexEntry documentIndexEntry = getDocumentIndexEntryWithRetry(segmentIndex, groupingKey, partitionId, primaryKey); if (documentIndexEntry != null) { state.context.ensureActive().clear(documentIndexEntry.entryKey); // TODO: Only if valid? long valid = indexWriter.tryDeleteDocument(documentIndexEntry.indexReader, documentIndexEntry.docId); @@ -360,6 +359,32 @@ int deleteDocument(Tuple groupingKey, Integer partitionId, Tuple primaryKey) thr return 0; } + /** + * Try to find the document for the given record in the segment index. + * This method would first try to find the document using the existing reader. If it can't, it will refresh the reader + * and try again. The incentive for this is when the documents have been updated in memory (e.g. in the same transaction), the + * writer may cache the changes in NRT and the reader (created before the updates) can't see them. Refreshing the reader from the + * writer can alleviate this by re-reading the changes in the NRT. + * If the index can't find the document with the refreshed reader, null is returned. + * @param groupingKey the grouping key for the index + * @param partitionId the partition ID for the index + * @param primaryKey the record primary key to look for + * @return segment index entry if the record was found, null if none + * @throws IOException in case of error + */ + @SuppressWarnings("PMD.CloseResource") + private LucenePrimaryKeySegmentIndex.DocumentIndexEntry getDocumentIndexEntryWithRetry(LucenePrimaryKeySegmentIndex segmentIndex, final Tuple groupingKey, final Integer partitionId, final Tuple primaryKey) throws IOException { + DirectoryReader directoryReader = directoryManager.getWriterReader(groupingKey, partitionId, false); + LucenePrimaryKeySegmentIndex.DocumentIndexEntry documentIndexEntry = segmentIndex.findDocument(directoryReader, primaryKey); + if (documentIndexEntry != null) { + return documentIndexEntry; + } else { + // Use refresh to ensure the reader can see the latest deletes + directoryReader = directoryManager.getWriterReader(groupingKey, partitionId, true); + return segmentIndex.findDocument(directoryReader, primaryKey); + } + } + @Override public CompletableFuture mergeIndex() { return rebalancePartitions() diff --git a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectoryManager.java b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectoryManager.java index 04a8356de7..30f3c25f19 100644 --- a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectoryManager.java +++ b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectoryManager.java @@ -52,6 +52,7 @@ import com.apple.foundationdb.subspace.Subspace; import com.apple.foundationdb.tuple.Tuple; import com.apple.foundationdb.tuple.TupleHelpers; +import com.google.common.annotations.VisibleForTesting; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; @@ -293,7 +294,8 @@ public void invalidatePrefix(@Nonnull Tuple prefix) { } } - private FDBDirectoryWrapper getDirectoryWrapper(@Nullable Tuple groupingKey, @Nullable Integer partitionId) { + @VisibleForTesting + public FDBDirectoryWrapper getDirectoryWrapper(@Nullable Tuple groupingKey, @Nullable Integer partitionId) { return getDirectoryWrapper(groupingKey, partitionId, getAgilityContext(false, false)); } @@ -368,7 +370,11 @@ public IndexWriter getIndexWriter(@Nullable Tuple groupingKey, @Nullable Integer } public DirectoryReader getWriterReader(@Nullable Tuple groupingKey, @Nullable Integer partititonId) throws IOException { - return getDirectoryWrapper(groupingKey, partititonId).getWriterReader(); + return getWriterReader(groupingKey, partititonId, false); + } + + public DirectoryReader getWriterReader(@Nullable Tuple groupingKey, @Nullable Integer partititonId, boolean refresh) throws IOException { + return getDirectoryWrapper(groupingKey, partititonId).getWriterReader(refresh); } @Nonnull diff --git a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectoryWrapper.java b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectoryWrapper.java index 23306cdeed..6366a7725f 100644 --- a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectoryWrapper.java +++ b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectoryWrapper.java @@ -20,6 +20,7 @@ package com.apple.foundationdb.record.lucene.directory; +import com.apple.foundationdb.annotation.API; import com.apple.foundationdb.record.lucene.LuceneAnalyzerWrapper; import com.apple.foundationdb.record.lucene.LuceneEvents; import com.apple.foundationdb.record.lucene.LuceneLoggerInfoStream; @@ -30,6 +31,8 @@ import com.apple.foundationdb.record.provider.foundationdb.IndexMaintainerState; import com.apple.foundationdb.subspace.Subspace; import com.apple.foundationdb.tuple.Tuple; +import com.apple.foundationdb.util.CloseException; +import com.apple.foundationdb.util.CloseableUtils; import com.google.common.annotations.VisibleForTesting; import org.apache.lucene.codecs.Codec; import org.apache.lucene.index.ConcurrentMergeScheduler; @@ -50,6 +53,8 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ThreadLocalRandom; /** @@ -58,6 +63,7 @@ * {@link FDBDirectory} contains cached information from FDB, it is important for cache coherency that all writers * (etc.) accessing that directory go through the same wrapper object so that they share a common cache. */ +@API(API.Status.INTERNAL) public class FDBDirectoryWrapper implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(FDBDirectoryWrapper.class); @@ -91,8 +97,13 @@ public class FDBDirectoryWrapper implements AutoCloseable { * predominately used by the {@link com.apple.foundationdb.record.lucene.LucenePrimaryKeySegmentIndex} to find the * segments associated with documents being deleted. */ - private final LazyCloseable writerReader; - + private LazyCloseable writerReader; + /** + * WriterReaders that were replaced (through {@link #getWriterReader(boolean)} )} with a {@code refresh==true}). + * These readers should all be closed, but they may still be in use while this class is in circulation, so their + * closure is postponed until this class' {@link #close()} call. + */ + private Queue> readersToClose; FDBDirectoryWrapper(@Nonnull final IndexMaintainerState state, @Nonnull final Tuple key, @@ -109,6 +120,7 @@ public class FDBDirectoryWrapper implements AutoCloseable { this.analyzerWrapper = analyzerWrapper; writer = LazyCloseable.supply(() -> createIndexWriter(exceptionAtCreation)); writerReader = LazyCloseable.supply(() -> DirectoryReader.open(writer.get())); + readersToClose = new ConcurrentLinkedQueue<>(); } @VisibleForTesting @@ -202,9 +214,18 @@ public IndexReader getReader() throws IOException { /** * Get a {@link DirectoryReader} wrapped around the {@link #getWriter()} to be able to get segments associated with * documents. This resource will be closed when {@code this} is closed, and should not be closed by callers + * @param refresh if TRUE will try to refresh the reader data from the writer */ @SuppressWarnings("PMD.CloseResource") - public DirectoryReader getWriterReader() throws IOException { + public DirectoryReader getWriterReader(boolean refresh) throws IOException { + if (refresh) { + final DirectoryReader newReader = DirectoryReader.openIfChanged(writerReader.get()); + if (newReader != null) { + // previous reader instantiated but then writer changed + readersToClose.add(writerReader); + writerReader = LazyCloseable.supply(() -> newReader); + } + } return writerReader.get(); } @@ -359,9 +380,19 @@ public IndexWriter getWriter() throws IOException { @SuppressWarnings("PMD.CloseResource") public synchronized void close() throws IOException { IOUtils.close(writer, writerReader, directory); + try { + CloseableUtils.closeAll(readersToClose.toArray(new LazyCloseable[0])); + } catch (CloseException e) { + throw new IOException(e); + } } public void mergeIndex() throws IOException { getWriter().maybeMerge(); } + + @VisibleForTesting + public Queue> getReadersToClose() { + return readersToClose; + } } diff --git a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java index ca8c2f3844..755e7f375e 100644 --- a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java +++ b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java @@ -30,9 +30,11 @@ import com.apple.foundationdb.record.TestRecordsTextProto; import com.apple.foundationdb.record.logging.KeyValueLogMessage; import com.apple.foundationdb.record.logging.LogMessageKeys; +import com.apple.foundationdb.record.lucene.codec.LazyCloseable; import com.apple.foundationdb.record.lucene.directory.AgilityContext; import com.apple.foundationdb.record.lucene.directory.FDBDirectory; import com.apple.foundationdb.record.lucene.directory.FDBDirectoryLockFactory; +import com.apple.foundationdb.record.lucene.directory.FDBDirectoryManager; import com.apple.foundationdb.record.lucene.directory.FDBDirectoryWrapper; import com.apple.foundationdb.record.metadata.Index; import com.apple.foundationdb.record.provider.common.FixedZeroKeyManager; @@ -60,6 +62,9 @@ import com.apple.test.SuperSlow; import com.apple.test.Tags; import com.apple.test.TestConfigurationUtils; +import com.google.common.collect.Streams; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; import org.apache.lucene.store.Lock; import org.hamcrest.Matchers; import org.junit.jupiter.api.Assertions; @@ -78,6 +83,7 @@ import java.io.IOException; import java.time.Duration; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; @@ -85,11 +91,13 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Queue; import java.util.Random; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ForkJoinPool; @@ -123,6 +131,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * Tests of the consistency of the Lucene Index. @@ -1099,6 +1108,124 @@ void randomlyRemoveAllRecords() throws IOException { assertThat(partitionCounts, contains(0))); } + static Stream multiUpdate() { + Stream seeds = Streams.concat( + Stream.of(5365L), + TestConfigurationUtils.onlyNightly(RandomizedTestUtils.randomSeeds(6664, 76778))); + return ParameterizedTestUtils.cartesianProduct( + ParameterizedTestUtils.booleans("isSynthetic"), + ParameterizedTestUtils.booleans("isGrouped"), + Stream.of(0, 10), + Stream.of(0, 1, 4), + seeds); + } + + @ParameterizedTest + @MethodSource("multiUpdate") + void multipleUpdatesInTransaction(boolean isSynthetic, boolean isGrouped, int highWatermark, int updateCount, long seed) throws IOException { + final LuceneIndexTestDataModel dataModel = new LuceneIndexTestDataModel.Builder(seed, this::getStoreBuilder, pathManager) + .setIsGrouped(isGrouped) + .setIsSynthetic(isSynthetic) + .setPrimaryKeySegmentIndexEnabled(true) + .setPartitionHighWatermark(highWatermark) + .build(); + + final int documentCount = 10 + dataModel.getRandom().nextInt(10); + + final RecordLayerPropertyStorage contextProps = RecordLayerPropertyStorage.newBuilder() + .addProp(LuceneRecordContextProperties.LUCENE_REPARTITION_DOCUMENT_COUNT, 2) + .addProp(LuceneRecordContextProperties.LUCENE_MERGE_SEGMENTS_PER_TIER, (double)dataModel.nextInt(10) + 2) // it must be at least 2.0 + .build(); + + // save records + try (FDBRecordContext context = openContext(contextProps)) { + dataModel.saveRecordsToAllGroups(documentCount, context); + commit(context); + } + + try (FDBRecordContext context = openContext(contextProps)) { + final FDBRecordStore store = dataModel.createOrOpenRecordStore(context); + dataModel.sampleRecordsUnderTest().forEach(rec -> { + for (int i = 0; i < updateCount; i++) { + // update some documents multiple times + rec.updateOtherValue(store).join(); + } + }); + commit(context); + } + explicitMergeIndex(dataModel.index, contextProps, dataModel.schemaSetup); + + if (highWatermark > 0) { + // ensure each partition has all records + dataModel.getPartitionCounts(() -> openContext(contextProps)).forEach((groupingKey, partitionCounts) -> + assertThat(partitionCounts.stream().mapToInt(i -> i).sum(), Matchers.equalTo(documentCount))); + } + + explicitMergeIndex(dataModel.index, contextProps, dataModel.schemaSetup); + dataModel.validate(() -> openContext(contextProps)); + } + + /** + * Test that the DirectoryWrapper accounts for all the created WriterReaders. + * Create multiple ReaderWriters in concurrent threads and ensure they are all closed. + */ + @Test + void testCreateReadersConcurrently() throws IOException { + final LuceneIndexTestDataModel dataModel = new LuceneIndexTestDataModel.Builder(1, this::getStoreBuilder, pathManager) + .setIsGrouped(true) + .setIsSynthetic(true) + .setPrimaryKeySegmentIndexEnabled(true) + .setPartitionHighWatermark(0) // no partitioning, so we only have one partition to work with + .build(); + final RecordLayerPropertyStorage contextProps = RecordLayerPropertyStorage.newBuilder() + .addProp(LuceneRecordContextProperties.LUCENE_REPARTITION_DOCUMENT_COUNT, 2) + .addProp(LuceneRecordContextProperties.LUCENE_MERGE_SEGMENTS_PER_TIER, (double)dataModel.nextInt(10) + 2) // it must be at least 2.0 + .build(); + + Queue createdReaders = new ConcurrentLinkedQueue<>(); + Set actualReaders; + Tuple groupingKey = Tuple.from(1); + Integer partitionId = null; + int threads = 10; + int loops = 10; + try (FDBRecordContext context = openContext(contextProps)) { + final FDBRecordStore store = dataModel.createOrOpenRecordStore(context); + final LuceneIndexMaintainer indexMaintainer = getIndexMaintainer(store, dataModel.index); + final FDBDirectoryManager directoryManager = indexMaintainer.getDirectoryManager(); + + List> futures = new ArrayList<>(threads); + for (int i = 0; i < threads; i++) { + futures.add(CompletableFuture.supplyAsync(() -> { + try { + for (int j = 0; j < loops; j++) { + // Cause the reader to become stale + dataModel.saveRecord(store, 1); + // Refresh the reader - this should create a new one + DirectoryReader writerReader = directoryManager.getWriterReader(groupingKey, partitionId, true); + // Store the created reader for later + createdReaders.add(writerReader); + } + return null; + } catch (IOException e) { + throw new RuntimeException(e); + } + }, context.getExecutor())); + } + AsyncUtil.getAll(futures).join(); + Stream stream = directoryManager.getDirectoryWrapper(groupingKey, partitionId).getReadersToClose() + .stream().map(LazyCloseable::getUnchecked); + // add the existing reader (that is not at the readersToClose) + stream = Streams.concat(stream, + Stream.of(directoryManager.getWriterReader(groupingKey, partitionId))); + actualReaders = stream.collect(Collectors.toSet()); + commit(context); + } + + // assert that each captured reader is included in the directory wrapper's list + createdReaders.forEach(createdReader -> + assertTrue(actualReaders.contains(createdReader))); + } + static Stream changingEncryptionKey() { return Stream.concat(Stream.of(Arguments.of(true, true, 288513), Arguments.of(false, false, 792025)), @@ -1584,4 +1711,9 @@ protected RecordLayerPropertyStorage.Builder addDefaultProps(final RecordLayerPr private Consumer noopConsumer() { return ignored -> { }; } + + @Nonnull + protected LuceneIndexMaintainer getIndexMaintainer(FDBRecordStore store, Index index) { + return (LuceneIndexMaintainer)store.getIndexMaintainer(index); + } }