Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1183,29 +1183,33 @@ public Position getFirstPosition() {
return firstLedgerId == null ? null : new PositionImpl(firstLedgerId, 0);
}

protected void internalResetCursor(PositionImpl position, AsyncCallbacks.ResetCursorCallback resetCursorCallback) {
if (position.equals(PositionImpl.EARLIEST)) {
position = ledger.getFirstPosition();
} else if (position.equals(PositionImpl.LATEST)) {
position = ledger.getLastPosition().getNext();
protected void internalResetCursor(PositionImpl proposedReadPosition,
AsyncCallbacks.ResetCursorCallback resetCursorCallback) {
final PositionImpl newReadPosition;
if (proposedReadPosition.equals(PositionImpl.EARLIEST)) {
newReadPosition = ledger.getFirstPosition();
} else if (proposedReadPosition.equals(PositionImpl.LATEST)) {
newReadPosition = ledger.getLastPosition().getNext();
} else {
newReadPosition = proposedReadPosition;
}

log.info("[{}] Initiate reset position to {} on cursor {}", ledger.getName(), position, name);
log.info("[{}] Initiate reset readPosition to {} on cursor {}", ledger.getName(), newReadPosition, name);

synchronized (pendingMarkDeleteOps) {
if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(this, FALSE, TRUE)) {
log.error("[{}] reset requested - position [{}], previous reset in progress - cursor {}",
ledger.getName(), position, name);
log.error("[{}] reset requested - readPosition [{}], previous reset in progress - cursor {}",
ledger.getName(), newReadPosition, name);
resetCursorCallback.resetFailed(
new ManagedLedgerException.ConcurrentFindCursorPositionException("reset already in progress"),
position);
newReadPosition);
return;
}
}

final AsyncCallbacks.ResetCursorCallback callback = resetCursorCallback;

final PositionImpl newPosition = position;
final PositionImpl newMarkDeletePosition = ledger.getPreviousPosition(newReadPosition);

VoidCallback finalCallback = new VoidCallback() {
@Override
Expand All @@ -1214,8 +1218,6 @@ public void operationComplete() {
// modify mark delete and read position since we are able to persist new position for cursor
lock.writeLock().lock();
try {
PositionImpl newMarkDeletePosition = ledger.getPreviousPosition(newPosition);

if (markDeletePosition.compareTo(newMarkDeletePosition) >= 0) {
MSG_CONSUMED_COUNTER_UPDATER.addAndGet(cursorImpl(), -getNumberOfEntries(
Range.closedOpen(newMarkDeletePosition, markDeletePosition)));
Expand All @@ -1230,55 +1232,55 @@ public void operationComplete() {
if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
batchDeletedIndexes.values().forEach(BitSetRecyclable::recycle);
batchDeletedIndexes.clear();
long[] resetWords = newPosition.ackSet;
long[] resetWords = newReadPosition.ackSet;
if (resetWords != null) {
BitSetRecyclable ackSet = BitSetRecyclable.create().resetWords(resetWords);
batchDeletedIndexes.put(newPosition, ackSet);
batchDeletedIndexes.put(newReadPosition, ackSet);
}
}

PositionImpl oldReadPosition = readPosition;
if (oldReadPosition.compareTo(newPosition) >= 0) {
log.info("[{}] reset position to {} before current read position {} on cursor {}",
ledger.getName(), newPosition, oldReadPosition, name);
if (oldReadPosition.compareTo(newReadPosition) >= 0) {
log.info("[{}] reset readPosition to {} before current read readPosition {} on cursor {}",
ledger.getName(), newReadPosition, oldReadPosition, name);
} else {
log.info("[{}] reset position to {} skipping from current read position {} on cursor {}",
ledger.getName(), newPosition, oldReadPosition, name);
log.info("[{}] reset readPosition to {} skipping from current read readPosition {} on "
+ "cursor {}", ledger.getName(), newReadPosition, oldReadPosition, name);
}
readPosition = newPosition;
ledger.onCursorReadPositionUpdated(ManagedCursorImpl.this, newPosition);
readPosition = newReadPosition;
ledger.onCursorReadPositionUpdated(ManagedCursorImpl.this, newReadPosition);
} finally {
lock.writeLock().unlock();
}
synchronized (pendingMarkDeleteOps) {
pendingMarkDeleteOps.clear();
if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(ManagedCursorImpl.this, TRUE, FALSE)) {
log.error("[{}] expected reset position [{}], but another reset in progress on cursor {}",
ledger.getName(), newPosition, name);
log.error("[{}] expected reset readPosition [{}], but another reset in progress on cursor {}",
ledger.getName(), newReadPosition, name);
}
}
callback.resetComplete(newPosition);
callback.resetComplete(newReadPosition);
updateLastActive();
}

@Override
public void operationFailed(ManagedLedgerException exception) {
synchronized (pendingMarkDeleteOps) {
if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(ManagedCursorImpl.this, TRUE, FALSE)) {
log.error("[{}] expected reset position [{}], but another reset in progress on cursor {}",
ledger.getName(), newPosition, name);
log.error("[{}] expected reset readPosition [{}], but another reset in progress on cursor {}",
ledger.getName(), newReadPosition, name);
}
}
callback.resetFailed(new ManagedLedgerException.InvalidCursorPositionException(
"unable to persist position for cursor reset " + newPosition.toString()), newPosition);
"unable to persist readPosition for cursor reset " + newReadPosition), newReadPosition);
}

};

persistentMarkDeletePosition = null;
inProgressMarkDeletePersistPosition = null;
lastMarkDeleteEntry = new MarkDeleteEntry(newPosition, getProperties(), null, null);
internalAsyncMarkDelete(newPosition, isCompactionCursor() ? getProperties() : Collections.emptyMap(),
lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, getProperties(), null, null);
internalAsyncMarkDelete(newMarkDeletePosition, isCompactionCursor() ? getProperties() : Collections.emptyMap(),
new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.IntRange;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
Expand Down Expand Up @@ -666,7 +667,7 @@ void testResetCursor1() throws Exception {
}
assertTrue(moveStatus.get());
PositionImpl earliestPos = new PositionImpl(actualEarliest.getLedgerId(), -1);
assertEquals(earliestPos, cursor.getReadPosition());
assertEquals(cursor.getReadPosition(), earliestPos);
moveStatus.set(false);

// reset to one after last entry in a ledger should point to the first entry in the next ledger
Expand Down Expand Up @@ -3273,6 +3274,126 @@ public void operationFailed(ManagedLedgerException exception) {
});
}

@Test(timeOut = 20000)
public void testRecoverCursorAfterResetToLatestForNewEntry() throws Exception {
ManagedLedger ml = factory.open("testRecoverCursorAfterResetToLatestForNewEntry");
ManagedCursorImpl c = (ManagedCursorImpl) ml.openCursor("sub", CommandSubscribe.InitialPosition.Latest);

// A new cursor starts out with these values. The rest of the test assumes this, so we assert it here.
assertEquals(c.getMarkDeletedPosition().getEntryId(), -1);
assertEquals(c.getReadPosition().getEntryId(), 0);
assertEquals(ml.getLastConfirmedEntry().getEntryId(), -1);

c.resetCursor(PositionImpl.LATEST);

// A reset cursor starts out with these values. The rest of the test assumes this, so we assert it here.
assertEquals(c.getMarkDeletedPosition().getEntryId(), -1);
assertEquals(c.getReadPosition().getEntryId(), 0);
assertEquals(ml.getLastConfirmedEntry().getEntryId(), -1);

final Position markDeleteBeforeRecover = c.getMarkDeletedPosition();
final Position readPositionBeforeRecover = c.getReadPosition();

// Trigger the lastConfirmedEntry to move forward
ml.addEntry(new byte[1]);

ManagedCursorInfo info = ManagedCursorInfo.newBuilder()
.setCursorsLedgerId(c.getCursorLedger())
.setMarkDeleteLedgerId(markDeleteBeforeRecover.getLedgerId())
.setMarkDeleteEntryId(markDeleteBeforeRecover.getEntryId())
.setLastActive(0L)
.build();

CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean failed = new AtomicBoolean(false);
c.recoverFromLedger(info, new VoidCallback() {
@Override
public void operationComplete() {
latch.countDown();
}

@Override
public void operationFailed(ManagedLedgerException exception) {
failed.set(true);
latch.countDown();
}
});

latch.await();
if (failed.get()) {
fail("Cursor recovery should not fail");
}
assertEquals(c.getMarkDeletedPosition(), markDeleteBeforeRecover);
assertEquals(c.getReadPosition(), readPositionBeforeRecover);
assertEquals(c.getNumberOfEntries(), 1L);
}

@Test(timeOut = 20000)
public void testRecoverCursorAfterResetToLatestForMultipleEntries() throws Exception {
ManagedLedger ml = factory.open("testRecoverCursorAfterResetToLatestForMultipleEntries");
ManagedCursorImpl c = (ManagedCursorImpl) ml.openCursor("sub", CommandSubscribe.InitialPosition.Latest);

// A new cursor starts out with these values. The rest of the test assumes this, so we assert it here.
assertEquals(c.getMarkDeletedPosition().getEntryId(), -1);
assertEquals(c.getReadPosition().getEntryId(), 0);
assertEquals(ml.getLastConfirmedEntry().getEntryId(), -1);

c.resetCursor(PositionImpl.LATEST);

// A reset cursor starts out with these values. The rest of the test assumes this, so we assert it here.
assertEquals(c.getMarkDeletedPosition().getEntryId(), -1);
assertEquals(c.getReadPosition().getEntryId(), 0);
assertEquals(ml.getLastConfirmedEntry().getEntryId(), -1);

// Trigger the lastConfirmedEntry to move forward
ml.addEntry(new byte[1]);
ml.addEntry(new byte[1]);
ml.addEntry(new byte[1]);
ml.addEntry(new byte[1]);

c.resetCursor(PositionImpl.LATEST);

assertEquals(c.getMarkDeletedPosition().getEntryId(), 3);
assertEquals(c.getReadPosition().getEntryId(), 4);
assertEquals(ml.getLastConfirmedEntry().getEntryId(), 3);

// Publish messages to move the lastConfirmedEntry field forward
ml.addEntry(new byte[1]);
ml.addEntry(new byte[1]);

final Position markDeleteBeforeRecover = c.getMarkDeletedPosition();
final Position readPositionBeforeRecover = c.getReadPosition();

ManagedCursorInfo info = ManagedCursorInfo.newBuilder()
.setCursorsLedgerId(c.getCursorLedger())
.setMarkDeleteLedgerId(markDeleteBeforeRecover.getLedgerId())
.setMarkDeleteEntryId(markDeleteBeforeRecover.getEntryId())
.setLastActive(0L)
.build();

CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean failed = new AtomicBoolean(false);
c.recoverFromLedger(info, new VoidCallback() {
@Override
public void operationComplete() {
latch.countDown();
}

@Override
public void operationFailed(ManagedLedgerException exception) {
failed.set(true);
latch.countDown();
}
});

latch.await();
if (failed.get()) {
fail("Cursor recovery should not fail");
}
assertEquals(c.getMarkDeletedPosition(), markDeleteBeforeRecover);
assertEquals(c.getReadPosition(), readPositionBeforeRecover);
assertEquals(c.getNumberOfEntries(), 2L);
}
@Test
void testAlwaysInactive() throws Exception {
ManagedLedger ml = factory.open("testAlwaysInactive");
Expand Down