diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java index 575a8b375e3..a3a0da7ffe4 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java @@ -22,6 +22,7 @@ package org.apache.bookkeeper.bookie; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.bookkeeper.bookie.storage.EntryLogScanner.ReadLengthType.READ_LEDGER_ENTRY_ID; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; @@ -1036,16 +1037,35 @@ public void scanEntryLog(long entryLogId, EntryLogScanner scanner) throws IOExce } // read the entry data.clear(); - data.capacity(entrySize); - int rc = readFromLogChannel(entryLogId, bc, data, pos); - if (rc != entrySize) { - LOG.warn("Short read for ledger entry from entryLog {}@{} ({} != {})", - entryLogId, pos, rc, entrySize); - return; + int rc; + // process the entry based on the read length type + switch (scanner.getReadLengthType()) { + case READ_NOTHING: + // skip read + scanner.process(ledgerId, offset, entrySize); + break; + case READ_LEDGER_ENTRY_ID: + data.capacity(READ_LEDGER_ENTRY_ID.getLengthToRead()); + rc = readFromLogChannel(entryLogId, bc, data, pos); + if (rc != READ_LEDGER_ENTRY_ID.getLengthToRead()) { + LOG.warn("Short read for ledger entry id from entrylog {}", entryLogId); + return; + } + long entryId = data.getLong(Long.BYTES); + scanner.process(ledgerId, offset, entrySize, entryId); + break; + case READ_ALL: + data.capacity(entrySize); + rc = readFromLogChannel(entryLogId, bc, data, pos); + if (rc != entrySize) { + LOG.warn("Short read for ledger entry id from entrylog {}", entryLogId); + return; + } + scanner.process(ledgerId, offset, data); + break; + default: + throw new IOException("Unknown read length type " + scanner.getReadLengthType()); } - // process the entry - scanner.process(ledgerId, offset, data); - // Advance position to the next entry pos += entrySize; } @@ -1164,18 +1184,21 @@ private EntryLogMetadata extractEntryLogMetadataByScanning(long entryLogId, // Read through the entry log file and extract the entry log meta scanEntryLog(entryLogId, new EntryLogScanner() { @Override - public void process(long ledgerId, long offset, ByteBuf entry) throws IOException { - if (throttler != null) { - throttler.acquire(entry.readableBytes()); - } + public void process(long ledgerId, long offset, int entrySize) throws IOException { // add new entry size of a ledger to entry log meta - meta.addLedgerSize(ledgerId, entry.readableBytes() + 4); + meta.addLedgerSize(ledgerId, entrySize + 4); } @Override public boolean accept(long ledgerId) { return ledgerId >= 0; } + + @Override + public ReadLengthType getReadLengthType(){ + // we only need to read the entry size. + return ReadLengthType.READ_NOTHING; + } }); if (LOG.isDebugEnabled()) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java index a05971a021f..0475819c48a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java @@ -114,9 +114,7 @@ public void initiate(boolean dryRun) throws IOException { LOG.info("Scanning {}", entryLogId); entryLogger.scanEntryLog(entryLogId, new EntryLogScanner() { @Override - public void process(long ledgerId, long offset, ByteBuf entry) throws IOException { - long entryId = entry.getLong(8); - + public void process(long ledgerId, long offset, int entrySize, long entryId) throws IOException { stats.computeIfAbsent(ledgerId, (ignore) -> new RecoveryStats()).registerEntry(entryId); // Actual location indexed is pointing past the entry size @@ -138,6 +136,11 @@ public void process(long ledgerId, long offset, ByteBuf entry) throws IOExceptio public boolean accept(long ledgerId) { return ledgerIds.contains(ledgerId); } + + @Override + public ReadLengthType getReadLengthType() { + return ReadLengthType.READ_LEDGER_ENTRY_ID; + } }); ledgerCache.flushLedger(true); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java index 2b6fca30c10..075052166bc 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java @@ -315,17 +315,19 @@ public boolean accept(long ledgerId) { } @Override - public void process(long ledgerId, long offset, ByteBuf entry) throws IOException { - long lid = entry.getLong(entry.readerIndex()); - long entryId = entry.getLong(entry.readerIndex() + 8); - if (lid != ledgerId || entryId < -1) { - LOG.warn("Scanning expected ledgerId {}, but found invalid entry " - + "with ledgerId {} entryId {} at offset {}", - ledgerId, lid, entryId, offset); + public void process(long ledgerId, long offset, int entrySize, long entryId) throws IOException { + if (entryId < -1) { + LOG.warn("Scanning found invalid entry with ledgerId {} entryId {} at offset {}", + ledgerId, entryId, offset); throw new IOException("Invalid entry found @ offset " + offset); } long location = (compactionLog.getDstLogId() << 32L) | (offset + 4); - offsets.add(new EntryLocation(lid, entryId, location)); + offsets.add(new EntryLocation(ledgerId, entryId, location)); + } + + @Override + public ReadLengthType getReadLengthType() { + return ReadLengthType.READ_LEDGER_ENTRY_ID; } }); LOG.info("Recovered {} entry locations from compacted log {}", offsets.size(), compactionLog.getDstLogId()); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java index 9305b6bef04..091a3261913 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java @@ -25,28 +25,76 @@ /** * Scan entries in a entry log file. + * Implementation for this interface should choose one of the following ReadLengthType: + * READ_ALL, READ_NOTHING, READ_LEDGER_ENTRY_ID_LENGTH.
+ * If the implementation chooses READ_ALL, it should implement {@link #process(long, long, ByteBuf)}.
+ * If the implementation chooses READ_NOTHING, it should implement {@link #process(long, long, int)}.
+ * If the implementation chooses READ_LEDGER_ENTRY_ID_LENGTH, it should implement {@link #process(long, long, int, long)}.
+ * */ public interface EntryLogScanner { + enum ReadLengthType { + // Read all data of the entry + READ_ALL(Integer.MAX_VALUE), + // Read nothing of the entry + READ_NOTHING(0), + // Read ledger id(8 byte) and entry id(8 byte) in the beginning of the entry + READ_LEDGER_ENTRY_ID(16); + + private final int lengthToRead; + ReadLengthType(int lengthToRead) { + this.lengthToRead = lengthToRead; + } + + public int getLengthToRead() { + return lengthToRead; + } + } + /** * Tests whether or not the entries belongs to the specified ledger * should be processed. * - * @param ledgerId - * Ledger ID. + * @param ledgerId ledger id * @return true if and only the entries of the ledger should be scanned. */ boolean accept(long ledgerId); /** - * Process an entry. - * - * @param ledgerId - * Ledger ID. - * @param offset - * File offset of this entry. - * @param entry - * Entry ByteBuf + * Process an entry when ReadLengthType is READ_NOTHING. + * @param ledgerId ledger id + * @param offset init offset of the entry + * @param entrySize entry size + * @throws IOException + */ + default void process(long ledgerId, long offset, int entrySize) throws IOException { + throw new UnsupportedOperationException("Not implemented when ReadLengthType is READ_NOTHING"); + } + + /** + * Process an entry when ReadLengthType is READ_LEDGER_ENTRY_ID_LENGTH. + * @param ledgerId ledger id + * @param offset init offset of the entry + * @param entrySize entry size + * @param entryId entry id * @throws IOException */ - void process(long ledgerId, long offset, ByteBuf entry) throws IOException; + default void process(long ledgerId, long offset, int entrySize, long entryId) throws IOException { + throw new UnsupportedOperationException("Not implemented when ReadLengthType is READ_LEDGER_ENTRY_ID_LENGTH"); + } + + /** + * Process an entry when ReadLengthType is READ_ALL. + * @param ledgerId ledger id + * @param offset init offset of the entry + * @param entry entry, containing ledgerId(8byte), entryId(8byte),... entrySize=entry.readableBytes() + */ + default void process(long ledgerId, long offset, ByteBuf entry) throws IOException{ + throw new UnsupportedOperationException("Not implemented when ReadLengthType is READ_ALL"); + } + + + default ReadLengthType getReadLengthType(){ + return ReadLengthType.READ_ALL; + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java index 323727217d6..e0e71f72be7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java @@ -423,18 +423,20 @@ EntryLogMetadata scanEntryLogMetadata(long logId, AbstractLogCompactor.Throttler // Read through the entry log file and extract the entry log meta scanEntryLog(logId, new EntryLogScanner() { @Override - public void process(long ledgerId, long offset, ByteBuf entry) throws IOException { + public void process(long ledgerId, long offset, int entrySize) throws IOException { // add new entry size of a ledger to entry log meta - if (throttler != null) { - throttler.acquire(entry.readableBytes()); - } - meta.addLedgerSize(ledgerId, entry.readableBytes() + Integer.BYTES); + meta.addLedgerSize(ledgerId, entrySize + Integer.BYTES); } @Override public boolean accept(long ledgerId) { return ledgerId >= 0; } + + @Override + public ReadLengthType getReadLengthType() { + return ReadLengthType.READ_NOTHING; + } }); return meta; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/LogReaderScan.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/LogReaderScan.java index 9718795143d..fa0aef25e09 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/LogReaderScan.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/LogReaderScan.java @@ -26,6 +26,8 @@ import java.io.IOException; import org.apache.bookkeeper.bookie.storage.EntryLogScanner; +import static org.apache.bookkeeper.bookie.storage.directentrylogger.LogMetadata.INVALID_LID; + class LogReaderScan { static void scan(ByteBufAllocator allocator, LogReader reader, EntryLogScanner scanner) throws IOException { int offset = Header.LOGFILE_LEGACY_HEADER_SIZE; @@ -47,11 +49,27 @@ static void scan(ByteBufAllocator allocator, LogReader reader, EntryLogScanner s // have realigned on the block boundary. offset += Integer.BYTES; + long ledgerId = reader.readLongAt(offset); + if (ledgerId == INVALID_LID || !scanner.accept(ledgerId)) { + offset += entrySize; + continue; + } + entry.clear(); - reader.readIntoBufferAt(entry, offset, entrySize); - long ledgerId = entry.getLong(0); - if (ledgerId >= 0 && scanner.accept(ledgerId)) { - scanner.process(ledgerId, initOffset, entry); + switch (scanner.getReadLengthType()) { + case READ_NOTHING: + scanner.process(ledgerId, initOffset, entrySize); + break; + case READ_LEDGER_ENTRY_ID: + long entryId = reader.readLongAt(offset + Long.BYTES); + scanner.process(ledgerId, initOffset, entrySize, entryId); + break; + case READ_ALL: + reader.readIntoBufferAt(entry, offset, entrySize); + scanner.process(ledgerId, initOffset, entry); + break; + default: + throw new IOException("Unknown read length type: " + scanner.getReadLengthType()); } offset += entrySize; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java index 2725897e804..fcbd4770da5 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java @@ -162,7 +162,7 @@ private void scanEntryLogFiles(Set ledgers, File[] lDirs) throws IOExcepti for (long entryLogId : entryLogs) { entryLogger.scanEntryLog(entryLogId, new EntryLogScanner() { @Override - public void process(long ledgerId, long offset, ByteBuf entry) throws IOException { + public void process(long ledgerId, long offset, int entrySize) throws IOException { if (ledgers.add(ledgerId)) { if (verbose) { LOG.info("Found ledger {} in entry log", ledgerId); @@ -174,6 +174,11 @@ public void process(long ledgerId, long offset, ByteBuf entry) throws IOExceptio public boolean accept(long ledgerId) { return true; } + + @Override + public ReadLengthType getReadLengthType() { + return ReadLengthType.READ_NOTHING; + } }); ++completedEntryLogs; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java index b9aaebec9e2..af901fde95b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java @@ -21,7 +21,7 @@ package org.apache.bookkeeper.bookie.storage.ldb; import com.google.common.collect.Sets; -import io.netty.buffer.ByteBuf; + import java.io.File; import java.io.IOException; import java.nio.file.FileSystems; @@ -100,9 +100,7 @@ public void initiate() throws IOException { for (long entryLogId : entryLogs) { entryLogger.scanEntryLog(entryLogId, new EntryLogScanner() { @Override - public void process(long ledgerId, long offset, ByteBuf entry) throws IOException { - long entryId = entry.getLong(8); - + public void process(long ledgerId, long offset, int entrySize, long entryId) throws IOException { // Actual location indexed is pointing past the entry size long location = (entryLogId << 32L) | (offset + 4); @@ -135,6 +133,11 @@ public void process(long ledgerId, long offset, ByteBuf entry) throws IOExceptio public boolean accept(long ledgerId) { return activeLedgers.contains(ledgerId); } + + @Override + public ReadLengthType getReadLengthType() { + return ReadLengthType.READ_LEDGER_ENTRY_ID; + } }); ++completedEntryLogs;