[fix][broker] Fix PendingAckHandleImpl when replay failed.#18886
[fix][broker] Fix PendingAckHandleImpl when replay failed.#18886tjiuming wants to merge 16 commits intoapache:masterfrom
Conversation
Codecov Report
@@ Coverage Diff @@
## master #18886 +/- ##
============================================
- Coverage 46.17% 38.79% -7.38%
+ Complexity 10359 3027 -7332
============================================
Files 703 289 -414
Lines 68845 24154 -44691
Branches 7382 2805 -4577
============================================
- Hits 31788 9371 -22417
+ Misses 33448 13632 -19816
+ Partials 3609 1151 -2458
Flags with carried forward coverage won't be shown. Click here to find out more.
|
|
The pr had no activity for 30 days, mark with Stale label. |
# Conflicts: # pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java # pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
| retryClose(); | ||
| return null; | ||
| }); | ||
| }, 20, TimeUnit.SECONDS); |
There was a problem hiding this comment.
20 seconds ?
Probably we have to log at WARN level something like
log.warn("Re-scheduling closing of subscription {} in 20 seconds")
congbobo184
left a comment
There was a problem hiding this comment.
if recover fail, we only need to close the sub then remove the sub from the PersistentTopic.subscriptions is enough.
https://github.com/nicoloboschi/pulsar/blob/8eb7ee1e9e96d4686e452320cdaba92d5eca7b4f/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java#L947-L952
add logic into this method
| log.warn("PersistentSubscription [{}] pendingAckHandleImpl relay failed " | ||
| + "when initialize topic [{}].", subscriptionName, topic, t); | ||
| if (subscriptions.remove(subscriptionName, subscription)) { | ||
| subscription.retryClose(); |
There was a problem hiding this comment.
why should add this method? persistentTopic init sub then remove will return false?
There was a problem hiding this comment.
I think it may happen.
subscription.getInitializeFuture()
.exceptionally(t -> {
// ignore...
})
will be executed after the topic created. If a consumer connected the topic and unsubscribe the topic, subscriptions.remove(...) may return false.
|
Will be fixed by #21760 |
Motivation
Currently, we don't handle PendingAckHandleImpl replay failed.
If the operation failed, it won't replay again, and won't remove the subscription from PersistentTopic which is already created.
It will lead to an unexpected situation: we couldn't create Consumers on the subscription.
Because in PersistentTopic#subscriptions, the subscription already exists, but initialized failed.
Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: tjiuming#15