diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 20f760d893adc..41249b390ba18 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -230,7 +230,7 @@ public MarkDeleteEntry(PositionImpl newPosition, Map properties, } public void triggerComplete() { - // Trigger the final callback after having (eventually) triggered the switchin-ledger operation. This + // Trigger the final callback after having (eventually) triggered the switching-ledger operation. This // will ensure that no race condition will happen between the next mark-delete and the switching // operation. if (callbackGroup != null) { @@ -1277,8 +1277,9 @@ public void operationFailed(ManagedLedgerException exception) { persistentMarkDeletePosition = null; inProgressMarkDeletePersistPosition = null; - lastMarkDeleteEntry = new MarkDeleteEntry(newPosition, getProperties(), null, null); - internalAsyncMarkDelete(newPosition, isCompactionCursor() ? getProperties() : Collections.emptyMap(), + PositionImpl newMarkDeletePosition = ledger.getPreviousPosition(newPosition); + lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, getProperties(), null, null); + internalAsyncMarkDelete(newMarkDeletePosition, isCompactionCursor() ? getProperties() : Collections.emptyMap(), new MarkDeleteCallback() { @Override public void markDeleteComplete(Object ctx) { @@ -2005,7 +2006,7 @@ void internalMarkDelete(final MarkDeleteEntry mdEntry) { LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> { if (last != null && last.newPosition.compareTo(mdEntry.newPosition) > 0) { - // keep the current value since it's later then the mdEntry.newPosition + // keep the current value since it's later than the mdEntry.newPosition return last; } else { return mdEntry; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java index 0e73db5291e00..7d82f78c3a048 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java @@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -111,6 +112,9 @@ public void testSeek() throws Exception { consumer.seek(messageIds.get(5)); assertEquals(sub.getNumberOfEntriesInBacklog(false), 5); + ManagedCursor cursor = topicRef.getSubscription("my-subscription").getCursor(); + assertEquals(cursor.getMarkDeletedPosition(), cursor.getPersistentMarkDeletedPosition()); + MessageIdImpl messageId = (MessageIdImpl) messageIds.get(5); MessageIdImpl beforeEarliest = new MessageIdImpl( messageId.getLedgerId() - 1, messageId.getEntryId(), messageId.getPartitionIndex());