diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 8311def7eab6c..7d93215c13b09 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1183,29 +1183,33 @@ public Position getFirstPosition() { return firstLedgerId == null ? null : new PositionImpl(firstLedgerId, 0); } - protected void internalResetCursor(PositionImpl position, AsyncCallbacks.ResetCursorCallback resetCursorCallback) { - if (position.equals(PositionImpl.EARLIEST)) { - position = ledger.getFirstPosition(); - } else if (position.equals(PositionImpl.LATEST)) { - position = ledger.getLastPosition().getNext(); + protected void internalResetCursor(PositionImpl proposedReadPosition, + AsyncCallbacks.ResetCursorCallback resetCursorCallback) { + final PositionImpl newReadPosition; + if (proposedReadPosition.equals(PositionImpl.EARLIEST)) { + newReadPosition = ledger.getFirstPosition(); + } else if (proposedReadPosition.equals(PositionImpl.LATEST)) { + newReadPosition = ledger.getLastPosition().getNext(); + } else { + newReadPosition = proposedReadPosition; } - log.info("[{}] Initiate reset position to {} on cursor {}", ledger.getName(), position, name); + log.info("[{}] Initiate reset readPosition to {} on cursor {}", ledger.getName(), newReadPosition, name); synchronized (pendingMarkDeleteOps) { if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(this, FALSE, TRUE)) { - log.error("[{}] reset requested - position [{}], previous reset in progress - cursor {}", - ledger.getName(), position, name); + log.error("[{}] reset requested - readPosition [{}], previous reset in progress - cursor {}", + ledger.getName(), newReadPosition, name); resetCursorCallback.resetFailed( new ManagedLedgerException.ConcurrentFindCursorPositionException("reset already in progress"), - position); + newReadPosition); return; } } final AsyncCallbacks.ResetCursorCallback callback = resetCursorCallback; - final PositionImpl newPosition = position; + final PositionImpl newMarkDeletePosition = ledger.getPreviousPosition(newReadPosition); VoidCallback finalCallback = new VoidCallback() { @Override @@ -1214,8 +1218,6 @@ public void operationComplete() { // modify mark delete and read position since we are able to persist new position for cursor lock.writeLock().lock(); try { - PositionImpl newMarkDeletePosition = ledger.getPreviousPosition(newPosition); - if (markDeletePosition.compareTo(newMarkDeletePosition) >= 0) { MSG_CONSUMED_COUNTER_UPDATER.addAndGet(cursorImpl(), -getNumberOfEntries( Range.closedOpen(newMarkDeletePosition, markDeletePosition))); @@ -1230,34 +1232,34 @@ public void operationComplete() { if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) { batchDeletedIndexes.values().forEach(BitSetRecyclable::recycle); batchDeletedIndexes.clear(); - long[] resetWords = newPosition.ackSet; + long[] resetWords = newReadPosition.ackSet; if (resetWords != null) { BitSetRecyclable ackSet = BitSetRecyclable.create().resetWords(resetWords); - batchDeletedIndexes.put(newPosition, ackSet); + batchDeletedIndexes.put(newReadPosition, ackSet); } } PositionImpl oldReadPosition = readPosition; - if (oldReadPosition.compareTo(newPosition) >= 0) { - log.info("[{}] reset position to {} before current read position {} on cursor {}", - ledger.getName(), newPosition, oldReadPosition, name); + if (oldReadPosition.compareTo(newReadPosition) >= 0) { + log.info("[{}] reset readPosition to {} before current read readPosition {} on cursor {}", + ledger.getName(), newReadPosition, oldReadPosition, name); } else { - log.info("[{}] reset position to {} skipping from current read position {} on cursor {}", - ledger.getName(), newPosition, oldReadPosition, name); + log.info("[{}] reset readPosition to {} skipping from current read readPosition {} on " + + "cursor {}", ledger.getName(), newReadPosition, oldReadPosition, name); } - readPosition = newPosition; - ledger.onCursorReadPositionUpdated(ManagedCursorImpl.this, newPosition); + readPosition = newReadPosition; + ledger.onCursorReadPositionUpdated(ManagedCursorImpl.this, newReadPosition); } finally { lock.writeLock().unlock(); } synchronized (pendingMarkDeleteOps) { pendingMarkDeleteOps.clear(); if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(ManagedCursorImpl.this, TRUE, FALSE)) { - log.error("[{}] expected reset position [{}], but another reset in progress on cursor {}", - ledger.getName(), newPosition, name); + log.error("[{}] expected reset readPosition [{}], but another reset in progress on cursor {}", + ledger.getName(), newReadPosition, name); } } - callback.resetComplete(newPosition); + callback.resetComplete(newReadPosition); updateLastActive(); } @@ -1265,20 +1267,20 @@ public void operationComplete() { public void operationFailed(ManagedLedgerException exception) { synchronized (pendingMarkDeleteOps) { if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(ManagedCursorImpl.this, TRUE, FALSE)) { - log.error("[{}] expected reset position [{}], but another reset in progress on cursor {}", - ledger.getName(), newPosition, name); + log.error("[{}] expected reset readPosition [{}], but another reset in progress on cursor {}", + ledger.getName(), newReadPosition, name); } } callback.resetFailed(new ManagedLedgerException.InvalidCursorPositionException( - "unable to persist position for cursor reset " + newPosition.toString()), newPosition); + "unable to persist readPosition for cursor reset " + newReadPosition), newReadPosition); } }; persistentMarkDeletePosition = null; inProgressMarkDeletePersistPosition = null; - lastMarkDeleteEntry = new MarkDeleteEntry(newPosition, getProperties(), null, null); - internalAsyncMarkDelete(newPosition, isCompactionCursor() ? getProperties() : Collections.emptyMap(), + lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, getProperties(), null, null); + internalAsyncMarkDelete(newMarkDeletePosition, isCompactionCursor() ? getProperties() : Collections.emptyMap(), new MarkDeleteCallback() { @Override public void markDeleteComplete(Object ctx) { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index a7b410c808a24..a9b8e358eec7d 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -92,6 +92,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; +import org.apache.pulsar.common.api.proto.CommandSubscribe; import org.apache.pulsar.common.api.proto.IntRange; import org.apache.pulsar.common.util.collections.BitSetRecyclable; import org.apache.pulsar.common.util.collections.LongPairRangeSet; @@ -666,7 +667,7 @@ void testResetCursor1() throws Exception { } assertTrue(moveStatus.get()); PositionImpl earliestPos = new PositionImpl(actualEarliest.getLedgerId(), -1); - assertEquals(earliestPos, cursor.getReadPosition()); + assertEquals(cursor.getReadPosition(), earliestPos); moveStatus.set(false); // reset to one after last entry in a ledger should point to the first entry in the next ledger @@ -3273,6 +3274,126 @@ public void operationFailed(ManagedLedgerException exception) { }); } + @Test(timeOut = 20000) + public void testRecoverCursorAfterResetToLatestForNewEntry() throws Exception { + ManagedLedger ml = factory.open("testRecoverCursorAfterResetToLatestForNewEntry"); + ManagedCursorImpl c = (ManagedCursorImpl) ml.openCursor("sub", CommandSubscribe.InitialPosition.Latest); + + // A new cursor starts out with these values. The rest of the test assumes this, so we assert it here. + assertEquals(c.getMarkDeletedPosition().getEntryId(), -1); + assertEquals(c.getReadPosition().getEntryId(), 0); + assertEquals(ml.getLastConfirmedEntry().getEntryId(), -1); + + c.resetCursor(PositionImpl.LATEST); + + // A reset cursor starts out with these values. The rest of the test assumes this, so we assert it here. + assertEquals(c.getMarkDeletedPosition().getEntryId(), -1); + assertEquals(c.getReadPosition().getEntryId(), 0); + assertEquals(ml.getLastConfirmedEntry().getEntryId(), -1); + + final Position markDeleteBeforeRecover = c.getMarkDeletedPosition(); + final Position readPositionBeforeRecover = c.getReadPosition(); + + // Trigger the lastConfirmedEntry to move forward + ml.addEntry(new byte[1]); + + ManagedCursorInfo info = ManagedCursorInfo.newBuilder() + .setCursorsLedgerId(c.getCursorLedger()) + .setMarkDeleteLedgerId(markDeleteBeforeRecover.getLedgerId()) + .setMarkDeleteEntryId(markDeleteBeforeRecover.getEntryId()) + .setLastActive(0L) + .build(); + + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean failed = new AtomicBoolean(false); + c.recoverFromLedger(info, new VoidCallback() { + @Override + public void operationComplete() { + latch.countDown(); + } + + @Override + public void operationFailed(ManagedLedgerException exception) { + failed.set(true); + latch.countDown(); + } + }); + + latch.await(); + if (failed.get()) { + fail("Cursor recovery should not fail"); + } + assertEquals(c.getMarkDeletedPosition(), markDeleteBeforeRecover); + assertEquals(c.getReadPosition(), readPositionBeforeRecover); + assertEquals(c.getNumberOfEntries(), 1L); + } + + @Test(timeOut = 20000) + public void testRecoverCursorAfterResetToLatestForMultipleEntries() throws Exception { + ManagedLedger ml = factory.open("testRecoverCursorAfterResetToLatestForMultipleEntries"); + ManagedCursorImpl c = (ManagedCursorImpl) ml.openCursor("sub", CommandSubscribe.InitialPosition.Latest); + + // A new cursor starts out with these values. The rest of the test assumes this, so we assert it here. + assertEquals(c.getMarkDeletedPosition().getEntryId(), -1); + assertEquals(c.getReadPosition().getEntryId(), 0); + assertEquals(ml.getLastConfirmedEntry().getEntryId(), -1); + + c.resetCursor(PositionImpl.LATEST); + + // A reset cursor starts out with these values. The rest of the test assumes this, so we assert it here. + assertEquals(c.getMarkDeletedPosition().getEntryId(), -1); + assertEquals(c.getReadPosition().getEntryId(), 0); + assertEquals(ml.getLastConfirmedEntry().getEntryId(), -1); + + // Trigger the lastConfirmedEntry to move forward + ml.addEntry(new byte[1]); + ml.addEntry(new byte[1]); + ml.addEntry(new byte[1]); + ml.addEntry(new byte[1]); + + c.resetCursor(PositionImpl.LATEST); + + assertEquals(c.getMarkDeletedPosition().getEntryId(), 3); + assertEquals(c.getReadPosition().getEntryId(), 4); + assertEquals(ml.getLastConfirmedEntry().getEntryId(), 3); + + // Publish messages to move the lastConfirmedEntry field forward + ml.addEntry(new byte[1]); + ml.addEntry(new byte[1]); + + final Position markDeleteBeforeRecover = c.getMarkDeletedPosition(); + final Position readPositionBeforeRecover = c.getReadPosition(); + + ManagedCursorInfo info = ManagedCursorInfo.newBuilder() + .setCursorsLedgerId(c.getCursorLedger()) + .setMarkDeleteLedgerId(markDeleteBeforeRecover.getLedgerId()) + .setMarkDeleteEntryId(markDeleteBeforeRecover.getEntryId()) + .setLastActive(0L) + .build(); + + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean failed = new AtomicBoolean(false); + c.recoverFromLedger(info, new VoidCallback() { + @Override + public void operationComplete() { + latch.countDown(); + } + + @Override + public void operationFailed(ManagedLedgerException exception) { + failed.set(true); + latch.countDown(); + } + }); + + latch.await(); + if (failed.get()) { + fail("Cursor recovery should not fail"); + } + assertEquals(c.getMarkDeletedPosition(), markDeleteBeforeRecover); + assertEquals(c.getReadPosition(), readPositionBeforeRecover); + assertEquals(c.getNumberOfEntries(), 2L); + } @Test void testAlwaysInactive() throws Exception { ManagedLedger ml = factory.open("testAlwaysInactive");