From 734126ef19dc16b3db8737dcc75db0a79889f5a0 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Thu, 21 Dec 2023 16:28:04 +0800 Subject: [PATCH 01/13] add support for specifying LengthToRead. --- .../bookkeeper/bookie/DefaultEntryLogger.java | 29 +++++++++++++------ .../bookkeeper/bookie/EntryLogCompactor.java | 2 +- .../InterleavedStorageRegenerateIndexOp.java | 7 ++++- .../TransactionalEntryLogCompactor.java | 9 ++++-- .../bookie/storage/EntryLogScanner.java | 11 ++++++- .../directentrylogger/DirectEntryLogger.java | 9 ++++-- .../storage/ldb/LedgersIndexRebuildOp.java | 7 ++++- .../storage/ldb/LocationsIndexRebuildOp.java | 7 ++++- .../cli/commands/bookie/ReadLogCommand.java | 7 ++--- 9 files changed, 66 insertions(+), 22 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java index 575a8b375e3..64d74b0faac 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java @@ -1036,15 +1036,20 @@ public void scanEntryLog(long entryLogId, EntryLogScanner scanner) throws IOExce } // read the entry data.clear(); - data.capacity(entrySize); - int rc = readFromLogChannel(entryLogId, bc, data, pos); - if (rc != entrySize) { - LOG.warn("Short read for ledger entry from entryLog {}@{} ({} != {})", - entryLogId, pos, rc, entrySize); - return; + int capacity = Math.min(scanner.getLengthToRead(), entrySize); + // skip read when scanner.getLengthToRead() == 0. + if (capacity > 0) { + data.capacity(capacity); + int rc = readFromLogChannel(entryLogId, bc, data, pos); + if (rc != entrySize) { + LOG.warn("Short read for ledger entry from entryLog {}@{} ({} != {})", + entryLogId, pos, rc, entrySize); + return; + } } + // process the entry - scanner.process(ledgerId, offset, data); + scanner.process(ledgerId, offset, data, entrySize); // Advance position to the next entry pos += entrySize; @@ -1164,18 +1169,24 @@ private EntryLogMetadata extractEntryLogMetadataByScanning(long entryLogId, // Read through the entry log file and extract the entry log meta scanEntryLog(entryLogId, new EntryLogScanner() { @Override - public void process(long ledgerId, long offset, ByteBuf entry) throws IOException { + public void process(long ledgerId, long offset, ByteBuf entry, int entrySize) throws IOException { if (throttler != null) { throttler.acquire(entry.readableBytes()); } // add new entry size of a ledger to entry log meta - meta.addLedgerSize(ledgerId, entry.readableBytes() + 4); + meta.addLedgerSize(ledgerId, entrySize + 4); } @Override public boolean accept(long ledgerId) { return ledgerId >= 0; } + + @Override + public int getLengthToRead(){ + // we only need to read the entry size. + return EntryLogScanner.READ_NOTHING; + } }); if (LOG.isDebugEnabled()) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java index 81cd463761a..c669b8e77d0 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java @@ -91,7 +91,7 @@ public boolean accept(long ledgerId) { } @Override - public void process(final long ledgerId, long offset, ByteBuf entry) throws IOException { + public void process(final long ledgerId, long offset, ByteBuf entry, int entrySize) throws IOException { throttler.acquire(entry.readableBytes()); if (offsets.size() > maxOutstandingRequests) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java index a05971a021f..ec90507fe74 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java @@ -114,7 +114,7 @@ public void initiate(boolean dryRun) throws IOException { LOG.info("Scanning {}", entryLogId); entryLogger.scanEntryLog(entryLogId, new EntryLogScanner() { @Override - public void process(long ledgerId, long offset, ByteBuf entry) throws IOException { + public void process(long ledgerId, long offset, ByteBuf entry, int entrySize) throws IOException { long entryId = entry.getLong(8); stats.computeIfAbsent(ledgerId, (ignore) -> new RecoveryStats()).registerEntry(entryId); @@ -138,6 +138,11 @@ public void process(long ledgerId, long offset, ByteBuf entry) throws IOExceptio public boolean accept(long ledgerId) { return ledgerIds.contains(ledgerId); } + + @Override + public int getLengthToRead() { + return EntryLogScanner.READ_ENTRY_ID; + } }); ledgerCache.flushLedger(true); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java index 2b6fca30c10..aba038ff357 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java @@ -169,7 +169,7 @@ public boolean accept(long ledgerId) { } @Override - public void process(long ledgerId, long offset, ByteBuf entry) throws IOException { + public void process(long ledgerId, long offset, ByteBuf entry, int entrySize) throws IOException { throttler.acquire(entry.readableBytes()); synchronized (TransactionalEntryLogCompactor.this) { long lid = entry.getLong(entry.readerIndex()); @@ -315,7 +315,7 @@ public boolean accept(long ledgerId) { } @Override - public void process(long ledgerId, long offset, ByteBuf entry) throws IOException { + public void process(long ledgerId, long offset, ByteBuf entry, int entrySize) throws IOException { long lid = entry.getLong(entry.readerIndex()); long entryId = entry.getLong(entry.readerIndex() + 8); if (lid != ledgerId || entryId < -1) { @@ -327,6 +327,11 @@ public void process(long ledgerId, long offset, ByteBuf entry) throws IOExceptio long location = (compactionLog.getDstLogId() << 32L) | (offset + 4); offsets.add(new EntryLocation(lid, entryId, location)); } + + @Override + public int getLengthToRead() { + return EntryLogScanner.READ_LEDGER_ENTRY_ID + } }); LOG.info("Recovered {} entry locations from compacted log {}", offsets.size(), compactionLog.getDstLogId()); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java index 9305b6bef04..b7ff748c72d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java @@ -27,6 +27,11 @@ * Scan entries in a entry log file. */ public interface EntryLogScanner { + public static final int READ_ALL = Integer.MAX_VALUE; + public static final int READ_NOTHING = 0; + public static final int READ_ENTRY_ID = 8; + public static final int READ_LEDGER_ENTRY_ID = 16; + /** * Tests whether or not the entries belongs to the specified ledger * should be processed. @@ -48,5 +53,9 @@ public interface EntryLogScanner { * Entry ByteBuf * @throws IOException */ - void process(long ledgerId, long offset, ByteBuf entry) throws IOException; + void process(long ledgerId, long offset, ByteBuf entry, int entrySize) throws IOException; + + default int getLengthToRead(){ + return READ_ALL; + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java index 323727217d6..24f4a86d503 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java @@ -423,18 +423,23 @@ EntryLogMetadata scanEntryLogMetadata(long logId, AbstractLogCompactor.Throttler // Read through the entry log file and extract the entry log meta scanEntryLog(logId, new EntryLogScanner() { @Override - public void process(long ledgerId, long offset, ByteBuf entry) throws IOException { + public void process(long ledgerId, long offset, ByteBuf entry, int entrySize) throws IOException { // add new entry size of a ledger to entry log meta if (throttler != null) { throttler.acquire(entry.readableBytes()); } - meta.addLedgerSize(ledgerId, entry.readableBytes() + Integer.BYTES); + meta.addLedgerSize(ledgerId, entrySize + Integer.BYTES); } @Override public boolean accept(long ledgerId) { return ledgerId >= 0; } + + @Override + public int getLengthToRead() { + return EntryLogScanner.READ_NOTHING; + } }); return meta; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java index 2725897e804..f166a9b1ea1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java @@ -162,7 +162,7 @@ private void scanEntryLogFiles(Set ledgers, File[] lDirs) throws IOExcepti for (long entryLogId : entryLogs) { entryLogger.scanEntryLog(entryLogId, new EntryLogScanner() { @Override - public void process(long ledgerId, long offset, ByteBuf entry) throws IOException { + public void process(long ledgerId, long offset, ByteBuf entry, int entrySize) throws IOException { if (ledgers.add(ledgerId)) { if (verbose) { LOG.info("Found ledger {} in entry log", ledgerId); @@ -174,6 +174,11 @@ public void process(long ledgerId, long offset, ByteBuf entry) throws IOExceptio public boolean accept(long ledgerId) { return true; } + + @Override + public int getLengthToRead() { + return EntryLogScanner.READ_NOTHING; + } }); ++completedEntryLogs; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java index b9aaebec9e2..b64d904fb81 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java @@ -100,7 +100,7 @@ public void initiate() throws IOException { for (long entryLogId : entryLogs) { entryLogger.scanEntryLog(entryLogId, new EntryLogScanner() { @Override - public void process(long ledgerId, long offset, ByteBuf entry) throws IOException { + public void process(long ledgerId, long offset, ByteBuf entry, int entrySize) throws IOException { long entryId = entry.getLong(8); // Actual location indexed is pointing past the entry size @@ -135,6 +135,11 @@ public void process(long ledgerId, long offset, ByteBuf entry) throws IOExceptio public boolean accept(long ledgerId) { return activeLedgers.contains(ledgerId); } + + @Override + public int getLengthToRead() { + return EntryLogScanner.READ_ENTRY_ID; + } }); ++completedEntryLogs; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogCommand.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogCommand.java index edef6609ff2..d44e491f06c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogCommand.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogCommand.java @@ -184,12 +184,11 @@ public boolean accept(long ledgerId) { } @Override - public void process(long ledgerId, long entryStartPos, ByteBuf entry) throws IOException { + public void process(long ledgerId, long entryStartPos, ByteBuf entry, int entrySize) throws IOException { if (!stopScanning.booleanValue()) { if ((rangeEndPos != -1) && (entryStartPos > rangeEndPos)) { stopScanning.setValue(true); } else { - int entrySize = entry.readableBytes(); /** * entrySize of an entry (inclusive of payload and * header) value is stored as int value in log file, but @@ -256,7 +255,7 @@ public boolean accept(long candidateLedgerId) { } @Override - public void process(long candidateLedgerId, long startPos, ByteBuf entry) { + public void process(long candidateLedgerId, long startPos, ByteBuf entry, int entrySize) { long entrysLedgerId = entry.getLong(entry.readerIndex()); long entrysEntryId = entry.getLong(entry.readerIndex() + 8); if ((candidateLedgerId == entrysLedgerId) && (candidateLedgerId == ledgerId) @@ -289,7 +288,7 @@ public boolean accept(long ledgerId) { } @Override - public void process(long ledgerId, long startPos, ByteBuf entry) { + public void process(long ledgerId, long startPos, ByteBuf entry, int entrySize) { FormatUtil.formatEntry(startPos, entry, printMsg, ledgerIdFormatter, entryFormatter); } }); From a9c6c78ff375dc0926685960a03a5e843fe91229 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Thu, 21 Dec 2023 16:34:03 +0800 Subject: [PATCH 02/13] fix. --- .../bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java | 2 +- .../bookkeeper/bookie/TransactionalEntryLogCompactor.java | 2 +- .../org/apache/bookkeeper/bookie/storage/EntryLogScanner.java | 1 - .../bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java | 2 +- 4 files changed, 3 insertions(+), 4 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java index ec90507fe74..bf923edeac0 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java @@ -141,7 +141,7 @@ public boolean accept(long ledgerId) { @Override public int getLengthToRead() { - return EntryLogScanner.READ_ENTRY_ID; + return EntryLogScanner.READ_LEDGER_ENTRY_ID; } }); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java index aba038ff357..abc3b309cec 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java @@ -330,7 +330,7 @@ public void process(long ledgerId, long offset, ByteBuf entry, int entrySize) th @Override public int getLengthToRead() { - return EntryLogScanner.READ_LEDGER_ENTRY_ID + return EntryLogScanner.READ_LEDGER_ENTRY_ID; } }); LOG.info("Recovered {} entry locations from compacted log {}", offsets.size(), compactionLog.getDstLogId()); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java index b7ff748c72d..087f5e56409 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java @@ -29,7 +29,6 @@ public interface EntryLogScanner { public static final int READ_ALL = Integer.MAX_VALUE; public static final int READ_NOTHING = 0; - public static final int READ_ENTRY_ID = 8; public static final int READ_LEDGER_ENTRY_ID = 16; /** diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java index b64d904fb81..161161da130 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java @@ -138,7 +138,7 @@ public boolean accept(long ledgerId) { @Override public int getLengthToRead() { - return EntryLogScanner.READ_ENTRY_ID; + return EntryLogScanner.READ_LEDGER_ENTRY_ID; } }); From 7dc76893eece23dca0ac58dd2560291580cd5bd4 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Thu, 21 Dec 2023 16:53:07 +0800 Subject: [PATCH 03/13] fix. --- .../bookie/storage/directentrylogger/LogReaderScan.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/LogReaderScan.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/LogReaderScan.java index 9718795143d..7a032f64e0d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/LogReaderScan.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/LogReaderScan.java @@ -51,7 +51,7 @@ static void scan(ByteBufAllocator allocator, LogReader reader, EntryLogScanner s reader.readIntoBufferAt(entry, offset, entrySize); long ledgerId = entry.getLong(0); if (ledgerId >= 0 && scanner.accept(ledgerId)) { - scanner.process(ledgerId, initOffset, entry); + scanner.process(ledgerId, initOffset, entry, entrySize); } offset += entrySize; } From 5c3823dcfdb9647d851f5001b07c1e5f2b43a84e Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Thu, 21 Dec 2023 17:54:00 +0800 Subject: [PATCH 04/13] introduce enum ReadLengthType. --- .../bookkeeper/bookie/DefaultEntryLogger.java | 6 ++--- .../InterleavedStorageRegenerateIndexOp.java | 4 ++-- .../TransactionalEntryLogCompactor.java | 4 ++-- .../bookie/storage/EntryLogScanner.java | 24 +++++++++++++++---- .../directentrylogger/DirectEntryLogger.java | 4 ++-- .../storage/ldb/LedgersIndexRebuildOp.java | 4 ++-- .../storage/ldb/LocationsIndexRebuildOp.java | 4 ++-- 7 files changed, 32 insertions(+), 18 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java index 64d74b0faac..afd1b59c1f7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java @@ -1036,7 +1036,7 @@ public void scanEntryLog(long entryLogId, EntryLogScanner scanner) throws IOExce } // read the entry data.clear(); - int capacity = Math.min(scanner.getLengthToRead(), entrySize); + int capacity = Math.min(scanner.getLengthToRead().getLengthToRead(), entrySize); // skip read when scanner.getLengthToRead() == 0. if (capacity > 0) { data.capacity(capacity); @@ -1183,9 +1183,9 @@ public boolean accept(long ledgerId) { } @Override - public int getLengthToRead(){ + public ReadLengthType getLengthToRead(){ // we only need to read the entry size. - return EntryLogScanner.READ_NOTHING; + return ReadLengthType.READ_NOTHING; } }); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java index bf923edeac0..608aae92c04 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java @@ -140,8 +140,8 @@ public boolean accept(long ledgerId) { } @Override - public int getLengthToRead() { - return EntryLogScanner.READ_LEDGER_ENTRY_ID; + public ReadLengthType getLengthToRead() { + return ReadLengthType.READ_LEDGER_ENTRY_ID_LENGTH; } }); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java index abc3b309cec..9db2e45f370 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java @@ -329,8 +329,8 @@ public void process(long ledgerId, long offset, ByteBuf entry, int entrySize) th } @Override - public int getLengthToRead() { - return EntryLogScanner.READ_LEDGER_ENTRY_ID; + public ReadLengthType getLengthToRead() { + return ReadLengthType.READ_LEDGER_ENTRY_ID_LENGTH; } }); LOG.info("Recovered {} entry locations from compacted log {}", offsets.size(), compactionLog.getDstLogId()); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java index 087f5e56409..61179e3ae9c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java @@ -27,9 +27,23 @@ * Scan entries in a entry log file. */ public interface EntryLogScanner { - public static final int READ_ALL = Integer.MAX_VALUE; - public static final int READ_NOTHING = 0; - public static final int READ_LEDGER_ENTRY_ID = 16; + enum ReadLengthType { + // Read all data of the entry + READ_ALL(Integer.MAX_VALUE), + // Read nothing of the entry + READ_NOTHING(0), + // Read ledger id(8 byte) and entry id(8 byte) in the beginning of the entry + READ_LEDGER_ENTRY_ID_LENGTH(16); + + private final int lengthToRead; + ReadLengthType(int lengthToRead) { + this.lengthToRead = lengthToRead; + } + + public int getLengthToRead() { + return lengthToRead; + } + } /** * Tests whether or not the entries belongs to the specified ledger @@ -54,7 +68,7 @@ public interface EntryLogScanner { */ void process(long ledgerId, long offset, ByteBuf entry, int entrySize) throws IOException; - default int getLengthToRead(){ - return READ_ALL; + default ReadLengthType getLengthToRead(){ + return ReadLengthType.READ_ALL; } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java index 24f4a86d503..5fdcc603813 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java @@ -437,8 +437,8 @@ public boolean accept(long ledgerId) { } @Override - public int getLengthToRead() { - return EntryLogScanner.READ_NOTHING; + public ReadLengthType getLengthToRead() { + return ReadLengthType.READ_NOTHING; } }); return meta; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java index f166a9b1ea1..b36f6e0738d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java @@ -176,8 +176,8 @@ public boolean accept(long ledgerId) { } @Override - public int getLengthToRead() { - return EntryLogScanner.READ_NOTHING; + public ReadLengthType getLengthToRead() { + return ReadLengthType.READ_NOTHING; } }); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java index 161161da130..178070927a8 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java @@ -137,8 +137,8 @@ public boolean accept(long ledgerId) { } @Override - public int getLengthToRead() { - return EntryLogScanner.READ_LEDGER_ENTRY_ID; + public ReadLengthType getLengthToRead() { + return ReadLengthType.READ_LEDGER_ENTRY_ID_LENGTH; } }); From 4aa6134050a5ef4e7d122a3071e69aa9b5145f8d Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Thu, 21 Dec 2023 18:21:36 +0800 Subject: [PATCH 05/13] introduce different process method. --- .../bookkeeper/bookie/DefaultEntryLogger.java | 5 +-- .../InterleavedStorageRegenerateIndexOp.java | 4 +-- .../TransactionalEntryLogCompactor.java | 13 +++---- .../bookie/storage/EntryLogScanner.java | 36 +++++++++++++------ .../directentrylogger/DirectEntryLogger.java | 5 +-- .../storage/ldb/LedgersIndexRebuildOp.java | 2 +- .../storage/ldb/LocationsIndexRebuildOp.java | 4 +-- 7 files changed, 35 insertions(+), 34 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java index afd1b59c1f7..bb6b462ccbb 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java @@ -1169,10 +1169,7 @@ private EntryLogMetadata extractEntryLogMetadataByScanning(long entryLogId, // Read through the entry log file and extract the entry log meta scanEntryLog(entryLogId, new EntryLogScanner() { @Override - public void process(long ledgerId, long offset, ByteBuf entry, int entrySize) throws IOException { - if (throttler != null) { - throttler.acquire(entry.readableBytes()); - } + public void process(long ledgerId, long offset, int entrySize) throws IOException { // add new entry size of a ledger to entry log meta meta.addLedgerSize(ledgerId, entrySize + 4); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java index 608aae92c04..c19a9075abe 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java @@ -114,9 +114,7 @@ public void initiate(boolean dryRun) throws IOException { LOG.info("Scanning {}", entryLogId); entryLogger.scanEntryLog(entryLogId, new EntryLogScanner() { @Override - public void process(long ledgerId, long offset, ByteBuf entry, int entrySize) throws IOException { - long entryId = entry.getLong(8); - + public void process(long ledgerId, long offset, int entrySize, long entryId) throws IOException { stats.computeIfAbsent(ledgerId, (ignore) -> new RecoveryStats()).registerEntry(entryId); // Actual location indexed is pointing past the entry size diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java index 9db2e45f370..0a59a76caf5 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java @@ -315,17 +315,14 @@ public boolean accept(long ledgerId) { } @Override - public void process(long ledgerId, long offset, ByteBuf entry, int entrySize) throws IOException { - long lid = entry.getLong(entry.readerIndex()); - long entryId = entry.getLong(entry.readerIndex() + 8); - if (lid != ledgerId || entryId < -1) { - LOG.warn("Scanning expected ledgerId {}, but found invalid entry " - + "with ledgerId {} entryId {} at offset {}", - ledgerId, lid, entryId, offset); + public void process(long ledgerId, long offset, int entrySize, long entryId) throws IOException { + if (entryId < -1) { + LOG.warn("Scanning found invalid entry with ledgerId {} entryId {} at offset {}", + ledgerId, entryId, offset); throw new IOException("Invalid entry found @ offset " + offset); } long location = (compactionLog.getDstLogId() << 32L) | (offset + 4); - offsets.add(new EntryLocation(lid, entryId, location)); + offsets.add(new EntryLocation(ledgerId, entryId, location)); } @Override diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java index 61179e3ae9c..2d8e738f1ec 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java @@ -49,24 +49,38 @@ public int getLengthToRead() { * Tests whether or not the entries belongs to the specified ledger * should be processed. * - * @param ledgerId - * Ledger ID. + * @param ledgerId ledger id * @return true if and only the entries of the ledger should be scanned. */ boolean accept(long ledgerId); /** - * Process an entry. - * - * @param ledgerId - * Ledger ID. - * @param offset - * File offset of this entry. - * @param entry - * Entry ByteBuf + * Process an entry when ReadLengthType is READ_NOTHING. + * @param ledgerId ledger id + * @param entrySize entry size + * @throws IOException + */ + default void process(long ledgerId, long offset, int entrySize) throws IOException {} + + /** + * Process an entry when ReadLengthType is READ_LEDGER_ENTRY_ID_LENGTH. + * @param ledgerId ledger id + * @param offset offset of the entry + * @param entrySize entry size + * @param entryId entry id * @throws IOException */ - void process(long ledgerId, long offset, ByteBuf entry, int entrySize) throws IOException; + default void process(long ledgerId, long offset, int entrySize, long entryId) throws IOException {} + + /** + * Process an entry when ReadLengthType is READ_ALL. + * @param ledgerId ledger id + * @param offset offset of the entry + * @param entry entry + * @param entrySize entry size + */ + default void process(long ledgerId, long offset, ByteBuf entry, int entrySize) throws IOException{} + default ReadLengthType getLengthToRead(){ return ReadLengthType.READ_ALL; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java index 5fdcc603813..7758bcd9720 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java @@ -423,11 +423,8 @@ EntryLogMetadata scanEntryLogMetadata(long logId, AbstractLogCompactor.Throttler // Read through the entry log file and extract the entry log meta scanEntryLog(logId, new EntryLogScanner() { @Override - public void process(long ledgerId, long offset, ByteBuf entry, int entrySize) throws IOException { + public void process(long ledgerId, long offset, int entrySize) throws IOException { // add new entry size of a ledger to entry log meta - if (throttler != null) { - throttler.acquire(entry.readableBytes()); - } meta.addLedgerSize(ledgerId, entrySize + Integer.BYTES); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java index b36f6e0738d..d3636c0a78b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java @@ -162,7 +162,7 @@ private void scanEntryLogFiles(Set ledgers, File[] lDirs) throws IOExcepti for (long entryLogId : entryLogs) { entryLogger.scanEntryLog(entryLogId, new EntryLogScanner() { @Override - public void process(long ledgerId, long offset, ByteBuf entry, int entrySize) throws IOException { + public void process(long ledgerId, long offset, int entrySize) throws IOException { if (ledgers.add(ledgerId)) { if (verbose) { LOG.info("Found ledger {} in entry log", ledgerId); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java index 178070927a8..7b86213427b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java @@ -100,9 +100,7 @@ public void initiate() throws IOException { for (long entryLogId : entryLogs) { entryLogger.scanEntryLog(entryLogId, new EntryLogScanner() { @Override - public void process(long ledgerId, long offset, ByteBuf entry, int entrySize) throws IOException { - long entryId = entry.getLong(8); - + public void process(long ledgerId, long offset, int entrySize, long entryId) throws IOException { // Actual location indexed is pointing past the entry size long location = (entryLogId << 32L) | (offset + 4); From 2d4b70087069c92037ce3bc944f315e93d102210 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Thu, 21 Dec 2023 20:01:57 +0800 Subject: [PATCH 06/13] refactor. --- .../bookkeeper/bookie/DefaultEntryLogger.java | 45 ++++++++++++------- .../InterleavedStorageRegenerateIndexOp.java | 2 +- .../TransactionalEntryLogCompactor.java | 2 +- .../bookie/storage/EntryLogScanner.java | 2 +- .../directentrylogger/DirectEntryLogger.java | 2 +- .../storage/ldb/LedgersIndexRebuildOp.java | 2 +- .../storage/ldb/LocationsIndexRebuildOp.java | 2 +- 7 files changed, 36 insertions(+), 21 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java index bb6b462ccbb..6a08f803c54 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java @@ -22,6 +22,7 @@ package org.apache.bookkeeper.bookie; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.bookkeeper.bookie.storage.EntryLogScanner.ReadLengthType.READ_LEDGER_ENTRY_ID_LENGTH; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; @@ -1036,21 +1037,35 @@ public void scanEntryLog(long entryLogId, EntryLogScanner scanner) throws IOExce } // read the entry data.clear(); - int capacity = Math.min(scanner.getLengthToRead().getLengthToRead(), entrySize); - // skip read when scanner.getLengthToRead() == 0. - if (capacity > 0) { - data.capacity(capacity); - int rc = readFromLogChannel(entryLogId, bc, data, pos); - if (rc != entrySize) { - LOG.warn("Short read for ledger entry from entryLog {}@{} ({} != {})", - entryLogId, pos, rc, entrySize); - return; - } + int rc; + // process the entry based on the read length type + switch (scanner.getReadLengthType()) { + case READ_NOTHING: + // skip read + scanner.process(ledgerId, offset, entrySize); + break; + case READ_LEDGER_ENTRY_ID_LENGTH: + data.capacity(READ_LEDGER_ENTRY_ID_LENGTH.getLengthToRead()); + rc = readFromLogChannel(entryLogId, bc, data, pos); + if (rc != READ_LEDGER_ENTRY_ID_LENGTH.getLengthToRead()) { + LOG.warn("Short read for ledger entry id from entrylog {}", entryLogId); + return; + } + // drop the ledger id + data.readLong(); + long entryId = data.readLong(); + scanner.process(ledgerId, offset, entrySize, entryId); + break; + case READ_ALL: + data.capacity(entrySize); + rc = readFromLogChannel(entryLogId, bc, data, pos); + if (rc != entrySize) { + LOG.warn("Short read for ledger entry id from entrylog {}", entryLogId); + return; + } + scanner.process(ledgerId, offset, data, entrySize); + break; } - - // process the entry - scanner.process(ledgerId, offset, data, entrySize); - // Advance position to the next entry pos += entrySize; } @@ -1180,7 +1195,7 @@ public boolean accept(long ledgerId) { } @Override - public ReadLengthType getLengthToRead(){ + public ReadLengthType getReadLengthType(){ // we only need to read the entry size. return ReadLengthType.READ_NOTHING; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java index c19a9075abe..4fac1a5c0ef 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java @@ -138,7 +138,7 @@ public boolean accept(long ledgerId) { } @Override - public ReadLengthType getLengthToRead() { + public ReadLengthType getReadLengthType() { return ReadLengthType.READ_LEDGER_ENTRY_ID_LENGTH; } }); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java index 0a59a76caf5..01f7c7c9311 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java @@ -326,7 +326,7 @@ public void process(long ledgerId, long offset, int entrySize, long entryId) thr } @Override - public ReadLengthType getLengthToRead() { + public ReadLengthType getReadLengthType() { return ReadLengthType.READ_LEDGER_ENTRY_ID_LENGTH; } }); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java index 2d8e738f1ec..e2bbf6ba97d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java @@ -82,7 +82,7 @@ default void process(long ledgerId, long offset, int entrySize, long entryId) th default void process(long ledgerId, long offset, ByteBuf entry, int entrySize) throws IOException{} - default ReadLengthType getLengthToRead(){ + default ReadLengthType getReadLengthType(){ return ReadLengthType.READ_ALL; } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java index 7758bcd9720..e0e71f72be7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java @@ -434,7 +434,7 @@ public boolean accept(long ledgerId) { } @Override - public ReadLengthType getLengthToRead() { + public ReadLengthType getReadLengthType() { return ReadLengthType.READ_NOTHING; } }); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java index d3636c0a78b..fcbd4770da5 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java @@ -176,7 +176,7 @@ public boolean accept(long ledgerId) { } @Override - public ReadLengthType getLengthToRead() { + public ReadLengthType getReadLengthType() { return ReadLengthType.READ_NOTHING; } }); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java index 7b86213427b..5a67d4090a0 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java @@ -135,7 +135,7 @@ public boolean accept(long ledgerId) { } @Override - public ReadLengthType getLengthToRead() { + public ReadLengthType getReadLengthType() { return ReadLengthType.READ_LEDGER_ENTRY_ID_LENGTH; } }); From c6130b3abe218b100fecb16de6925de9b8d21e4e Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Thu, 21 Dec 2023 20:07:50 +0800 Subject: [PATCH 07/13] fix. --- .../org/apache/bookkeeper/bookie/EntryLogCompactor.java | 2 +- .../bookkeeper/bookie/TransactionalEntryLogCompactor.java | 2 +- .../apache/bookkeeper/bookie/storage/EntryLogScanner.java | 3 +-- .../tools/cli/commands/bookie/ReadLogCommand.java | 8 ++++---- 4 files changed, 7 insertions(+), 8 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java index c669b8e77d0..81cd463761a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java @@ -91,7 +91,7 @@ public boolean accept(long ledgerId) { } @Override - public void process(final long ledgerId, long offset, ByteBuf entry, int entrySize) throws IOException { + public void process(final long ledgerId, long offset, ByteBuf entry) throws IOException { throttler.acquire(entry.readableBytes()); if (offsets.size() > maxOutstandingRequests) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java index 01f7c7c9311..a221456bc0e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java @@ -169,7 +169,7 @@ public boolean accept(long ledgerId) { } @Override - public void process(long ledgerId, long offset, ByteBuf entry, int entrySize) throws IOException { + public void process(long ledgerId, long offset, ByteBuf entry) throws IOException { throttler.acquire(entry.readableBytes()); synchronized (TransactionalEntryLogCompactor.this) { long lid = entry.getLong(entry.readerIndex()); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java index e2bbf6ba97d..d2708cc038e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java @@ -77,9 +77,8 @@ default void process(long ledgerId, long offset, int entrySize, long entryId) th * @param ledgerId ledger id * @param offset offset of the entry * @param entry entry - * @param entrySize entry size */ - default void process(long ledgerId, long offset, ByteBuf entry, int entrySize) throws IOException{} + default void process(long ledgerId, long offset, ByteBuf entry) throws IOException{} default ReadLengthType getReadLengthType(){ diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogCommand.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogCommand.java index d44e491f06c..3658ec49944 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogCommand.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogCommand.java @@ -184,7 +184,7 @@ public boolean accept(long ledgerId) { } @Override - public void process(long ledgerId, long entryStartPos, ByteBuf entry, int entrySize) throws IOException { + public void process(long ledgerId, long entryStartPos, ByteBuf entry) throws IOException { if (!stopScanning.booleanValue()) { if ((rangeEndPos != -1) && (entryStartPos > rangeEndPos)) { stopScanning.setValue(true); @@ -197,7 +197,7 @@ public void process(long ledgerId, long entryStartPos, ByteBuf entry, int entryS * 4 (intsize of entrySize). Please check * EntryLogger.scanEntryLog. */ - long entryEndPos = entryStartPos + entrySize + 4 - 1; + long entryEndPos = entryStartPos + entry.readableBytes() + 4 - 1; if (((rangeEndPos == -1) || (entryStartPos <= rangeEndPos)) && (rangeStartPos <= entryEndPos)) { FormatUtil.formatEntry(entryStartPos, entry, printMsg, ledgerIdFormatter, entryFormatter); entryFound.setValue(true); @@ -255,7 +255,7 @@ public boolean accept(long candidateLedgerId) { } @Override - public void process(long candidateLedgerId, long startPos, ByteBuf entry, int entrySize) { + public void process(long candidateLedgerId, long startPos, ByteBuf entry) { long entrysLedgerId = entry.getLong(entry.readerIndex()); long entrysEntryId = entry.getLong(entry.readerIndex() + 8); if ((candidateLedgerId == entrysLedgerId) && (candidateLedgerId == ledgerId) @@ -288,7 +288,7 @@ public boolean accept(long ledgerId) { } @Override - public void process(long ledgerId, long startPos, ByteBuf entry, int entrySize) { + public void process(long ledgerId, long startPos, ByteBuf entry) { FormatUtil.formatEntry(startPos, entry, printMsg, ledgerIdFormatter, entryFormatter); } }); From 869b5ccea4289fc4605e9a75a8ab7a9747f3bf0f Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Thu, 21 Dec 2023 20:14:29 +0800 Subject: [PATCH 08/13] fix. --- .../java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java | 2 +- .../bookie/storage/directentrylogger/LogReaderScan.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java index 6a08f803c54..6a9561b245f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java @@ -1063,7 +1063,7 @@ public void scanEntryLog(long entryLogId, EntryLogScanner scanner) throws IOExce LOG.warn("Short read for ledger entry id from entrylog {}", entryLogId); return; } - scanner.process(ledgerId, offset, data, entrySize); + scanner.process(ledgerId, offset, data); break; } // Advance position to the next entry diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/LogReaderScan.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/LogReaderScan.java index 7a032f64e0d..9718795143d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/LogReaderScan.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/LogReaderScan.java @@ -51,7 +51,7 @@ static void scan(ByteBufAllocator allocator, LogReader reader, EntryLogScanner s reader.readIntoBufferAt(entry, offset, entrySize); long ledgerId = entry.getLong(0); if (ledgerId >= 0 && scanner.accept(ledgerId)) { - scanner.process(ledgerId, initOffset, entry, entrySize); + scanner.process(ledgerId, initOffset, entry); } offset += entrySize; } From c2fd96ec1702acc0288f470646b1a792473f16fd Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Thu, 21 Dec 2023 20:16:36 +0800 Subject: [PATCH 09/13] add code. --- .../bookkeeper/tools/cli/commands/bookie/ReadLogCommand.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogCommand.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogCommand.java index 3658ec49944..edef6609ff2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogCommand.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogCommand.java @@ -189,6 +189,7 @@ public void process(long ledgerId, long entryStartPos, ByteBuf entry) throws IOE if ((rangeEndPos != -1) && (entryStartPos > rangeEndPos)) { stopScanning.setValue(true); } else { + int entrySize = entry.readableBytes(); /** * entrySize of an entry (inclusive of payload and * header) value is stored as int value in log file, but @@ -197,7 +198,7 @@ public void process(long ledgerId, long entryStartPos, ByteBuf entry) throws IOE * 4 (intsize of entrySize). Please check * EntryLogger.scanEntryLog. */ - long entryEndPos = entryStartPos + entry.readableBytes() + 4 - 1; + long entryEndPos = entryStartPos + entrySize + 4 - 1; if (((rangeEndPos == -1) || (entryStartPos <= rangeEndPos)) && (rangeStartPos <= entryEndPos)) { FormatUtil.formatEntry(entryStartPos, entry, printMsg, ledgerIdFormatter, entryFormatter); entryFound.setValue(true); From 52dff17a5269df96bd3c983e5560c196bab898ee Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Thu, 21 Dec 2023 20:36:31 +0800 Subject: [PATCH 10/13] add implementation in LogReaderScan. --- .../bookie/storage/EntryLogScanner.java | 5 ++-- .../directentrylogger/LogReaderScan.java | 23 +++++++++++++++---- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java index d2708cc038e..efd3f4f7f8b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java @@ -57,6 +57,7 @@ public int getLengthToRead() { /** * Process an entry when ReadLengthType is READ_NOTHING. * @param ledgerId ledger id + * @param offset init offset of the entry * @param entrySize entry size * @throws IOException */ @@ -65,7 +66,7 @@ default void process(long ledgerId, long offset, int entrySize) throws IOExcepti /** * Process an entry when ReadLengthType is READ_LEDGER_ENTRY_ID_LENGTH. * @param ledgerId ledger id - * @param offset offset of the entry + * @param offset init offset of the entry * @param entrySize entry size * @param entryId entry id * @throws IOException @@ -75,7 +76,7 @@ default void process(long ledgerId, long offset, int entrySize, long entryId) th /** * Process an entry when ReadLengthType is READ_ALL. * @param ledgerId ledger id - * @param offset offset of the entry + * @param offset init offset of the entry * @param entry entry */ default void process(long ledgerId, long offset, ByteBuf entry) throws IOException{} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/LogReaderScan.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/LogReaderScan.java index 9718795143d..323270865d4 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/LogReaderScan.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/LogReaderScan.java @@ -26,6 +26,8 @@ import java.io.IOException; import org.apache.bookkeeper.bookie.storage.EntryLogScanner; +import static org.apache.bookkeeper.bookie.storage.directentrylogger.LogMetadata.INVALID_LID; + class LogReaderScan { static void scan(ByteBufAllocator allocator, LogReader reader, EntryLogScanner scanner) throws IOException { int offset = Header.LOGFILE_LEGACY_HEADER_SIZE; @@ -47,11 +49,24 @@ static void scan(ByteBufAllocator allocator, LogReader reader, EntryLogScanner s // have realigned on the block boundary. offset += Integer.BYTES; + long ledgerId = reader.readLongAt(offset); + if (ledgerId == INVALID_LID || !scanner.accept(ledgerId)) { + offset += entrySize; + continue; + } + entry.clear(); - reader.readIntoBufferAt(entry, offset, entrySize); - long ledgerId = entry.getLong(0); - if (ledgerId >= 0 && scanner.accept(ledgerId)) { - scanner.process(ledgerId, initOffset, entry); + switch (scanner.getReadLengthType()) { + case READ_NOTHING: + scanner.process(ledgerId, initOffset, entrySize); + break; + case READ_LEDGER_ENTRY_ID_LENGTH: + long entryId = reader.readLongAt(offset + Long.BYTES); + scanner.process(ledgerId, initOffset, entrySize, entryId); + break; + case READ_ALL: + reader.readIntoBufferAt(entry, offset, entrySize); + scanner.process(ledgerId, initOffset, entry); } offset += entrySize; } From fdddd8b2e56e4660dccbe58ec772153116081614 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Thu, 21 Dec 2023 20:42:28 +0800 Subject: [PATCH 11/13] add code. --- .../apache/bookkeeper/bookie/DefaultEntryLogger.java | 12 +++++------- .../bookie/InterleavedStorageRegenerateIndexOp.java | 2 +- .../bookie/TransactionalEntryLogCompactor.java | 2 +- .../bookkeeper/bookie/storage/EntryLogScanner.java | 4 ++-- .../storage/directentrylogger/LogReaderScan.java | 2 +- .../bookie/storage/ldb/LocationsIndexRebuildOp.java | 4 ++-- 6 files changed, 12 insertions(+), 14 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java index 6a9561b245f..b4255bb2c23 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java @@ -22,7 +22,7 @@ package org.apache.bookkeeper.bookie; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.bookkeeper.bookie.storage.EntryLogScanner.ReadLengthType.READ_LEDGER_ENTRY_ID_LENGTH; +import static org.apache.bookkeeper.bookie.storage.EntryLogScanner.ReadLengthType.READ_LEDGER_ENTRY_ID; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; @@ -1044,16 +1044,14 @@ public void scanEntryLog(long entryLogId, EntryLogScanner scanner) throws IOExce // skip read scanner.process(ledgerId, offset, entrySize); break; - case READ_LEDGER_ENTRY_ID_LENGTH: - data.capacity(READ_LEDGER_ENTRY_ID_LENGTH.getLengthToRead()); + case READ_LEDGER_ENTRY_ID: + data.capacity(READ_LEDGER_ENTRY_ID.getLengthToRead()); rc = readFromLogChannel(entryLogId, bc, data, pos); - if (rc != READ_LEDGER_ENTRY_ID_LENGTH.getLengthToRead()) { + if (rc != READ_LEDGER_ENTRY_ID.getLengthToRead()) { LOG.warn("Short read for ledger entry id from entrylog {}", entryLogId); return; } - // drop the ledger id - data.readLong(); - long entryId = data.readLong(); + long entryId = data.getLong(Long.BYTES); scanner.process(ledgerId, offset, entrySize, entryId); break; case READ_ALL: diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java index 4fac1a5c0ef..0475819c48a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java @@ -139,7 +139,7 @@ public boolean accept(long ledgerId) { @Override public ReadLengthType getReadLengthType() { - return ReadLengthType.READ_LEDGER_ENTRY_ID_LENGTH; + return ReadLengthType.READ_LEDGER_ENTRY_ID; } }); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java index a221456bc0e..075052166bc 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java @@ -327,7 +327,7 @@ public void process(long ledgerId, long offset, int entrySize, long entryId) thr @Override public ReadLengthType getReadLengthType() { - return ReadLengthType.READ_LEDGER_ENTRY_ID_LENGTH; + return ReadLengthType.READ_LEDGER_ENTRY_ID; } }); LOG.info("Recovered {} entry locations from compacted log {}", offsets.size(), compactionLog.getDstLogId()); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java index efd3f4f7f8b..934f3e5d56a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java @@ -33,7 +33,7 @@ enum ReadLengthType { // Read nothing of the entry READ_NOTHING(0), // Read ledger id(8 byte) and entry id(8 byte) in the beginning of the entry - READ_LEDGER_ENTRY_ID_LENGTH(16); + READ_LEDGER_ENTRY_ID(16); private final int lengthToRead; ReadLengthType(int lengthToRead) { @@ -77,7 +77,7 @@ default void process(long ledgerId, long offset, int entrySize, long entryId) th * Process an entry when ReadLengthType is READ_ALL. * @param ledgerId ledger id * @param offset init offset of the entry - * @param entry entry + * @param entry entry, containing ledgerId(8byte), entryId(8byte),... entrySize=entry.readableBytes() */ default void process(long ledgerId, long offset, ByteBuf entry) throws IOException{} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/LogReaderScan.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/LogReaderScan.java index 323270865d4..32de6c5e1d2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/LogReaderScan.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/LogReaderScan.java @@ -60,7 +60,7 @@ static void scan(ByteBufAllocator allocator, LogReader reader, EntryLogScanner s case READ_NOTHING: scanner.process(ledgerId, initOffset, entrySize); break; - case READ_LEDGER_ENTRY_ID_LENGTH: + case READ_LEDGER_ENTRY_ID: long entryId = reader.readLongAt(offset + Long.BYTES); scanner.process(ledgerId, initOffset, entrySize, entryId); break; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java index 5a67d4090a0..af901fde95b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java @@ -21,7 +21,7 @@ package org.apache.bookkeeper.bookie.storage.ldb; import com.google.common.collect.Sets; -import io.netty.buffer.ByteBuf; + import java.io.File; import java.io.IOException; import java.nio.file.FileSystems; @@ -136,7 +136,7 @@ public boolean accept(long ledgerId) { @Override public ReadLengthType getReadLengthType() { - return ReadLengthType.READ_LEDGER_ENTRY_ID_LENGTH; + return ReadLengthType.READ_LEDGER_ENTRY_ID; } }); From 04f076e49b33f0595498f981c08d731f62b2d5b7 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Sat, 23 Dec 2023 12:11:11 +0800 Subject: [PATCH 12/13] add default case. --- .../java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java | 2 ++ .../bookie/storage/directentrylogger/LogReaderScan.java | 3 +++ 2 files changed, 5 insertions(+) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java index b4255bb2c23..a3a0da7ffe4 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java @@ -1063,6 +1063,8 @@ public void scanEntryLog(long entryLogId, EntryLogScanner scanner) throws IOExce } scanner.process(ledgerId, offset, data); break; + default: + throw new IOException("Unknown read length type " + scanner.getReadLengthType()); } // Advance position to the next entry pos += entrySize; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/LogReaderScan.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/LogReaderScan.java index 32de6c5e1d2..fa0aef25e09 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/LogReaderScan.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/LogReaderScan.java @@ -67,6 +67,9 @@ static void scan(ByteBufAllocator allocator, LogReader reader, EntryLogScanner s case READ_ALL: reader.readIntoBufferAt(entry, offset, entrySize); scanner.process(ledgerId, initOffset, entry); + break; + default: + throw new IOException("Unknown read length type: " + scanner.getReadLengthType()); } offset += entrySize; } From 4d2810e4d00a1eb9a5ce42a4c51fd7e12d9ebb23 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Sat, 23 Dec 2023 21:42:23 +0800 Subject: [PATCH 13/13] throw exception when not implemented right. --- .../bookie/storage/EntryLogScanner.java | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java index 934f3e5d56a..091a3261913 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java @@ -25,6 +25,12 @@ /** * Scan entries in a entry log file. + * Implementation for this interface should choose one of the following ReadLengthType: + * READ_ALL, READ_NOTHING, READ_LEDGER_ENTRY_ID_LENGTH.
+ * If the implementation chooses READ_ALL, it should implement {@link #process(long, long, ByteBuf)}.
+ * If the implementation chooses READ_NOTHING, it should implement {@link #process(long, long, int)}.
+ * If the implementation chooses READ_LEDGER_ENTRY_ID_LENGTH, it should implement {@link #process(long, long, int, long)}.
+ * */ public interface EntryLogScanner { enum ReadLengthType { @@ -61,7 +67,9 @@ public int getLengthToRead() { * @param entrySize entry size * @throws IOException */ - default void process(long ledgerId, long offset, int entrySize) throws IOException {} + default void process(long ledgerId, long offset, int entrySize) throws IOException { + throw new UnsupportedOperationException("Not implemented when ReadLengthType is READ_NOTHING"); + } /** * Process an entry when ReadLengthType is READ_LEDGER_ENTRY_ID_LENGTH. @@ -71,7 +79,9 @@ default void process(long ledgerId, long offset, int entrySize) throws IOExcepti * @param entryId entry id * @throws IOException */ - default void process(long ledgerId, long offset, int entrySize, long entryId) throws IOException {} + default void process(long ledgerId, long offset, int entrySize, long entryId) throws IOException { + throw new UnsupportedOperationException("Not implemented when ReadLengthType is READ_LEDGER_ENTRY_ID_LENGTH"); + } /** * Process an entry when ReadLengthType is READ_ALL. @@ -79,7 +89,9 @@ default void process(long ledgerId, long offset, int entrySize, long entryId) th * @param offset init offset of the entry * @param entry entry, containing ledgerId(8byte), entryId(8byte),... entrySize=entry.readableBytes() */ - default void process(long ledgerId, long offset, ByteBuf entry) throws IOException{} + default void process(long ledgerId, long offset, ByteBuf entry) throws IOException{ + throw new UnsupportedOperationException("Not implemented when ReadLengthType is READ_ALL"); + } default ReadLengthType getReadLengthType(){