From 26297ac77c05cf0a2b8b56112c140eecf2cb9c06 Mon Sep 17 00:00:00 2001 From: Penghui Li Date: Wed, 17 Dec 2025 00:45:57 -0800 Subject: [PATCH 1/5] [fix][broker] Fix cursor position persistence in ledger trimming (#25087) Co-authored-by: Jiwe Guo --- .../mledger/impl/ManagedLedgerImpl.java | 24 ++++++--- .../mledger/impl/ManagedCursorTest.java | 4 +- .../mledger/impl/ManagedLedgerTest.java | 53 +++++++++++++++++++ 3 files changed, 72 insertions(+), 9 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index ec1f2801aa386..4b278cf6664d4 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2731,14 +2731,22 @@ public void maybeUpdateCursorBeforeTrimmingConsumedLedger() { } if (!lastAckedPosition.equals(cursor.getMarkDeletedPosition())) { - try { - log.info("Reset cursor:{} to {} since ledger consumed completely", cursor, lastAckedPosition); - onCursorMarkDeletePositionUpdated((ManagedCursorImpl) cursor, lastAckedPosition); - } catch (Exception e) { - log.warn("Failed to reset cursor: {} from {} to {}. Trimming thread will retry next time.", - cursor, cursor.getMarkDeletedPosition(), lastAckedPosition); - log.warn("Caused by", e); - } + Position finalPosition = lastAckedPosition; + log.info("Reset cursor:{} to {} since ledger consumed completely", cursor, lastAckedPosition); + cursor.asyncMarkDelete(lastAckedPosition, cursor.getProperties(), + new MarkDeleteCallback() { + @Override + public void markDeleteComplete(Object ctx) { + log.info("Successfully persisted cursor position for cursor:{} to {}", + cursor, finalPosition); + } + + @Override + public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + log.warn("Failed to reset cursor: {} from {} to {}. Trimming thread will retry next time.", + cursor, cursor.getMarkDeletedPosition(), finalPosition, exception); + } + }, null); } } } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 9cd597904fcdb..6a50f7404b509 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -1277,7 +1277,9 @@ void cursorPersistence() throws Exception { c2 = ledger.openCursor("c2"); assertEquals(c1.getMarkDeletedPosition(), p1); - assertEquals(c2.getMarkDeletedPosition(), p2); + // move mark-delete-position from 3:5 to 6:-1 since all the entries have been consumed + ManagedCursor finalC2 = c2; + Awaitility.await().untilAsserted(() -> assertNotEquals(finalC2.getMarkDeletedPosition(), p2)); } @Test(timeOut = 20000) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 66553fe117bc2..1cdeb415b7c21 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -5107,4 +5107,57 @@ public void testComparePositions() throws Exception { // cleanup. ml.delete(); } + + @Test + public void testTrimmerRaceCondition() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(1); + config.setRetentionTime(0, TimeUnit.MILLISECONDS); + config.setRetentionSizeInMB(0); + + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testTrimmerRaceCondition", config); + ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c1"); + + // 1. Add Entry 1 (Ledger 1) + ledger.addEntry("entry-1".getBytes(Encoding)); + + // 2. Ack Entry 1. Verify Persistence with properties. + List entries = cursor.readEntries(1); + assertEquals(entries.size(), 1); + Position lastPosition = entries.get(0).getPosition(); + entries.forEach(Entry::release); + + // Mark delete with properties + Map properties = new HashMap<>(); + properties.put("test-property", 12345L); + CountDownLatch latch = new CountDownLatch(1); + cursor.asyncMarkDelete(lastPosition, properties, new MarkDeleteCallback() { + @Override + public void markDeleteComplete(Object ctx) { + latch.countDown(); + } + + @Override + public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + fail("Mark delete should succeed"); + } + }, null); + + latch.await(); + assertEquals(cursor.getPersistentMarkDeletedPosition(), lastPosition); + assertEquals(ledger.getCursors().getSlowestCursorPosition(), lastPosition); + assertEquals(cursor.getProperties(), properties); + + // 3. Add Entry 2. Triggers Rollover. + // This implicitly calls maybeUpdateCursorBeforeTrimmingConsumedLedger due to rollover + Position p = ledger.addEntry("entry-2".getBytes(Encoding)); + + // Wait for background tasks (metadata callback) to complete. + // We expect at least 2 ledgers (Rollover happened). + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> ledger.getLedgersInfo().size() >= 2); + assertEquals(cursor.getPersistentMarkDeletedPosition(), new ImmutablePositionImpl(p.getLedgerId(), -1)); + + // Verify properties are preserved after cursor reset + assertEquals(cursor.getProperties(), properties); + } } From e041fab73c1b9a6bb93457d186fc0a937beafd53 Mon Sep 17 00:00:00 2001 From: Oneby Wang <44369297+oneby-wang@users.noreply.github.com> Date: Wed, 17 Dec 2025 19:16:20 +0800 Subject: [PATCH 2/5] [fix][admin] Refactor namespace anti affinity group sync operations to async in rest api (#25086) Co-authored-by: oneby-wang --- .../resources/LocalPoliciesResources.java | 6 +- .../pulsar/broker/admin/AdminResource.java | 17 +- .../broker/admin/impl/NamespacesBase.java | 186 ++++++------------ .../pulsar/broker/admin/v1/Namespaces.java | 55 +++++- .../pulsar/broker/admin/v2/Namespaces.java | 58 ++++-- .../pulsar/broker/admin/NamespacesTest.java | 80 ++++++++ .../pulsar/broker/admin/NamespacesV2Test.java | 95 ++++++++- 7 files changed, 332 insertions(+), 165 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java index b7ef19ccbe812..8e7b0ab0b1ea6 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java @@ -37,9 +37,9 @@ public LocalPoliciesResources(MetadataStore localStore, int operationTimeoutSec) super(localStore, LocalPolicies.class, operationTimeoutSec); } - public void setLocalPolicies(NamespaceName ns, Function modifyFunction) - throws MetadataStoreException { - set(joinPath(LOCAL_POLICIES_ROOT, ns.toString()), modifyFunction); + public CompletableFuture setLocalPoliciesAsync(NamespaceName ns, + Function modifyFunction) { + return setAsync(joinPath(LOCAL_POLICIES_ROOT, ns.toString()), modifyFunction); } public Optional getLocalPolicies(NamespaceName ns) throws MetadataStoreException{ diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index bfa1fdc812b7b..9ea5b4c33cc09 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.admin; +import static org.apache.commons.lang3.StringUtils.isBlank; import com.fasterxml.jackson.databind.ObjectReader; import com.fasterxml.jackson.databind.ObjectWriter; import com.google.errorprone.annotations.CanIgnoreReturnValue; @@ -498,14 +499,12 @@ protected CompletableFuture getPartitionedTopicMetadat }); } - protected void validateClusterExists(String cluster) { - try { - if (!clusterResources().getCluster(cluster).isPresent()) { + protected CompletableFuture validateClusterExistsAsync(String cluster) { + return clusterResources().clusterExistsAsync(cluster).thenAccept(clusterExist -> { + if (!clusterExist) { throw new RestException(Status.PRECONDITION_FAILED, "Cluster " + cluster + " does not exist."); } - } catch (Exception e) { - throw new RestException(e); - } + }); } protected Policies getNamespacePolicies(String tenant, String cluster, String namespace) { @@ -874,6 +873,12 @@ protected void checkNotNull(Object o, String errorMessage) { } } + protected void checkNotBlank(String str, String errorMessage) { + if (isBlank(str)) { + throw new RestException(Status.PRECONDITION_FAILED, errorMessage); + } + } + protected boolean isManagedLedgerNotFoundException(Throwable cause) { return cause instanceof ManagedLedgerException.MetadataNotFoundException || cause instanceof MetadataStoreException.NotFoundException; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 0cb4f9a493d6b..bfbd65234813e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.broker.admin.impl; -import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.pulsar.common.policies.data.PoliciesUtil.getBundles; import com.google.common.collect.Sets; import java.lang.reflect.Field; @@ -1038,35 +1037,6 @@ protected CompletableFuture internalDeleteBookieAffinityGroupAsync() { return internalSetBookieAffinityGroupAsync(null); } - @Deprecated - protected BookieAffinityGroupData internalGetBookieAffinityGroup() { - validateSuperUserAccess(); - - if (namespaceName.isGlobal()) { - // check cluster ownership for a given global namespace: redirect if peer-cluster owns it - validateGlobalNamespaceOwnership(namespaceName); - } else { - validateClusterOwnership(namespaceName.getCluster()); - validateClusterForTenant(namespaceName.getTenant(), namespaceName.getCluster()); - } - try { - final BookieAffinityGroupData bookkeeperAffinityGroup = getLocalPolicies().getLocalPolicies(namespaceName) - .orElseThrow(() -> new RestException(Status.NOT_FOUND, - "Namespace local-policies does not exist")).bookieAffinityGroup; - return bookkeeperAffinityGroup; - } catch (NotFoundException e) { - log.warn("[{}] Failed to get local-policy configuration for namespace {}: does not exist", - clientAppId(), namespaceName); - throw new RestException(Status.NOT_FOUND, "Namespace policies does not exist"); - } catch (RestException re) { - throw re; - } catch (Exception e) { - log.error("[{}] Failed to get local-policy configuration for namespace {}", clientAppId(), - namespaceName, e); - throw new RestException(e); - } - } - protected CompletableFuture internalGetBookieAffinityGroupAsync() { return validateSuperUserAccessAsync().thenCompose(__ -> { if (namespaceName.isGlobal()) { @@ -1077,9 +1047,8 @@ protected CompletableFuture internalGetBookieAffinityGr unused -> validateClusterForTenantAsync(namespaceName.getTenant(), namespaceName.getCluster())); } }).thenCompose(__ -> getLocalPolicies().getLocalPoliciesAsync(namespaceName)) - .thenApply(policies -> policies.orElseThrow( - () -> new RestException(Status.NOT_FOUND, "Namespace local-policies does not exist")) - .bookieAffinityGroup); + .thenApply(policies -> policies.orElseThrow(() -> new RestException(Status.NOT_FOUND, + "Namespace local-policies does not exist")).bookieAffinityGroup); } private CompletableFuture validateLeaderBrokerAsync() { @@ -1842,104 +1811,69 @@ protected void internalSetDelayedDelivery(DelayedDeliveryPolicies delayedDeliver internalSetPolicies("delayed_delivery_policies", delayedDeliveryPolicies); } - protected void internalSetNamespaceAntiAffinityGroup(String antiAffinityGroup) { - validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE); - checkNotNull(antiAffinityGroup, "AntiAffinityGroup should not be null"); - validatePoliciesReadOnlyAccess(); - - log.info("[{}] Setting anti-affinity group {} for {}", clientAppId(), antiAffinityGroup, namespaceName); - - if (isBlank(antiAffinityGroup)) { - throw new RestException(Status.PRECONDITION_FAILED, "antiAffinityGroup can't be empty"); - } - - try { - getLocalPolicies().setLocalPoliciesWithCreate(namespaceName, (lp)-> - lp.map(policies -> new LocalPolicies(policies.bundles, - policies.bookieAffinityGroup, - antiAffinityGroup, - policies.migrated)) - .orElseGet(() -> new LocalPolicies(getDefaultBundleData(), null, antiAffinityGroup)) - ); - log.info("[{}] Successfully updated local-policies configuration: namespace={}, map={}", clientAppId(), - namespaceName, antiAffinityGroup); - } catch (RestException re) { - throw re; - } catch (Exception e) { - log.error("[{}] Failed to update local-policy configuration for namespace {}", clientAppId(), namespaceName, - e); - throw new RestException(e); - } - } - - protected String internalGetNamespaceAntiAffinityGroup() { - validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.READ); - - try { - return getLocalPolicies() - .getLocalPolicies(namespaceName) - .orElseGet(() -> new LocalPolicies(getBundles(config().getDefaultNumberOfNamespaceBundles()) - , null, null)).namespaceAntiAffinityGroup; - } catch (Exception e) { - log.error("[{}] Failed to get the antiAffinityGroup of namespace {}", clientAppId(), namespaceName, e); - throw new RestException(Status.NOT_FOUND, "Couldn't find namespace policies"); - } - } - - protected void internalRemoveNamespaceAntiAffinityGroup() { - validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE); - validatePoliciesReadOnlyAccess(); - - log.info("[{}] Deleting anti-affinity group for {}", clientAppId(), namespaceName); - - try { - getLocalPolicies().setLocalPolicies(namespaceName, (policies)-> - new LocalPolicies(policies.bundles, - policies.bookieAffinityGroup, - null, - policies.migrated)); - log.info("[{}] Successfully removed anti-affinity group for a namespace={}", clientAppId(), namespaceName); - } catch (Exception e) { - log.error("[{}] Failed to remove anti-affinity group for namespace {}", clientAppId(), namespaceName, e); - throw new RestException(e); - } + protected CompletableFuture internalSetNamespaceAntiAffinityGroupAsync(String antiAffinityGroup) { + checkNotNull(antiAffinityGroup, "Anti-affinity group should not be null"); + checkNotBlank(antiAffinityGroup, "Anti-affinity group can't be empty"); + return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE) + .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()).thenCompose( + __ -> getDefaultBundleDataAsync().thenCompose( + defaultBundleData -> getLocalPolicies().setLocalPoliciesWithCreateAsync(namespaceName, + oldPolicies -> oldPolicies.map(policies -> new LocalPolicies(policies.bundles, + policies.bookieAffinityGroup, antiAffinityGroup, + policies.migrated)) + .orElseGet(() -> new LocalPolicies(defaultBundleData, null, + antiAffinityGroup))))) + .thenAccept(__ -> log.info( + "[{}] Successfully updated namespace anti-affinity group, namespace={}, anti-affinity" + + " group={}", clientAppId(), namespaceName, antiAffinityGroup)); + } + + protected CompletableFuture internalGetNamespaceAntiAffinityGroupAsync() { + return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.READ) + .thenCompose(__ -> getLocalPolicies().getLocalPoliciesAsync(namespaceName) + .thenApply(policiesOpt -> policiesOpt.map(localPolicies -> localPolicies.namespaceAntiAffinityGroup) + .orElse(null))); + } + + protected CompletableFuture internalRemoveNamespaceAntiAffinityGroupAsync() { + return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE) + .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) + .thenCompose(__ -> { + log.info("[{}] Removing anti-affinity group for namespace: {}", clientAppId(), namespaceName); + return getLocalPolicies().setLocalPoliciesAsync(namespaceName, + (policies) -> new LocalPolicies(policies.bundles, policies.bookieAffinityGroup, null, + policies.migrated)); + }) + .thenAccept(__ -> log.info("[{}] Successfully removed anti-affinity group for namespace: {}", + clientAppId(), namespaceName)); } - protected List internalGetAntiAffinityNamespaces(String cluster, String antiAffinityGroup, - String tenant) { - validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.READ); + protected CompletableFuture> internalGetAntiAffinityNamespacesAsync(String cluster, + String antiAffinityGroup, + String tenant) { checkNotNull(cluster, "Cluster should not be null"); - checkNotNull(antiAffinityGroup, "AntiAffinityGroup should not be null"); + checkNotNull(antiAffinityGroup, "Anti-affinity group should not be null"); checkNotNull(tenant, "Tenant should not be null"); + checkNotBlank(antiAffinityGroup, "Anti-affinity group can't be empty"); - log.info("[{}]-{} Finding namespaces for {} in {}", clientAppId(), tenant, antiAffinityGroup, cluster); - - if (isBlank(antiAffinityGroup)) { - throw new RestException(Status.PRECONDITION_FAILED, "anti-affinity group can't be empty."); - } - validateClusterExists(cluster); - - try { - List namespaces = tenantResources().getListOfNamespaces(tenant); - - return namespaces.stream().filter(ns -> { - Optional policies; - try { - policies = getLocalPolicies().getLocalPolicies(NamespaceName.get(ns)); - } catch (Exception e) { - throw new RuntimeException(e); - } - - String storedAntiAffinityGroup = policies.orElseGet(() -> - new LocalPolicies(getBundles(config().getDefaultNumberOfNamespaceBundles()), - null, null)).namespaceAntiAffinityGroup; - return antiAffinityGroup.equalsIgnoreCase(storedAntiAffinityGroup); - }).collect(Collectors.toList()); - - } catch (Exception e) { - log.warn("Failed to list of properties/namespace from global-zk", e); - throw new RestException(e); - } + return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.READ) + .thenCompose(__ -> validateClusterExistsAsync(cluster)) + .thenCompose(__ -> { + log.info("[{}]-{} Finding namespaces for {} in {}", clientAppId(), tenant, antiAffinityGroup, + cluster); + return tenantResources().getListOfNamespacesAsync(tenant).thenCompose(namespaces -> { + List> nsFutures = namespaces.stream() + .map(ns -> getLocalPolicies().getLocalPoliciesAsync(NamespaceName.get(ns)) + .thenApply(policiesOpt -> policiesOpt.map( + localPolicies -> localPolicies.namespaceAntiAffinityGroup).orElse(null)) + .thenApply(antiAffinityGroup::equalsIgnoreCase) + .thenApply(equals -> equals ? ns : null)).toList(); + CompletableFuture allFuture = FutureUtil.waitForAll(nsFutures); + return allFuture.thenApply( + unused -> nsFutures.stream().map(CompletableFuture::join).filter(Objects::nonNull) + .toList()); + }); + }); } private boolean checkQuotas(Policies policies, RetentionPolicies retention) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index c6b8dbf5d2d1e..cad6899c8a290 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -586,10 +586,19 @@ public void removeSubscriptionExpirationTime(@Suspended AsyncResponse asyncRespo @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"), @ApiResponse(code = 412, message = "Invalid antiAffinityGroup") }) - public void setNamespaceAntiAffinityGroup(@PathParam("property") String property, - @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, String antiAffinityGroup) { + public void setNamespaceAntiAffinityGroup(@Suspended AsyncResponse asyncResponse, + @PathParam("property") String property, + @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace, String antiAffinityGroup) { validateNamespaceName(property, cluster, namespace); - internalSetNamespaceAntiAffinityGroup(antiAffinityGroup); + internalSetNamespaceAntiAffinityGroupAsync(antiAffinityGroup) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("[{}] Failed to set namespace anti-affinity group, tenant: {}, namespace: {}, " + + "antiAffinityGroup: {}", clientAppId(), property, namespace, antiAffinityGroup, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @GET @@ -597,10 +606,19 @@ public void setNamespaceAntiAffinityGroup(@PathParam("property") String property @ApiOperation(value = "Get anti-affinity group of a namespace.") @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") }) - public String getNamespaceAntiAffinityGroup(@PathParam("property") String property, - @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) { + public void getNamespaceAntiAffinityGroup(@Suspended AsyncResponse asyncResponse, + @PathParam("property") String property, + @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace) { validateNamespaceName(property, cluster, namespace); - return internalGetNamespaceAntiAffinityGroup(); + internalGetNamespaceAntiAffinityGroupAsync() + .thenAccept(asyncResponse::resume) + .exceptionally(ex -> { + log.error("[{}] Failed to get namespace anti-affinity group, tenant: {}, namespace: {}", + clientAppId(), property, namespace, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @GET @@ -609,10 +627,19 @@ public String getNamespaceAntiAffinityGroup(@PathParam("property") String proper + " api can be only accessed by admin of any of the existing property") @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 412, message = "Cluster not exist/Anti-affinity group can't be empty.")}) - public List getAntiAffinityNamespaces(@PathParam("cluster") String cluster, + public void getAntiAffinityNamespaces(@Suspended AsyncResponse asyncResponse, + @PathParam("cluster") String cluster, @PathParam("group") String antiAffinityGroup, @QueryParam("property") String property) { - return internalGetAntiAffinityNamespaces(cluster, antiAffinityGroup, property); + internalGetAntiAffinityNamespacesAsync(cluster, antiAffinityGroup, property) + .thenAccept(asyncResponse::resume) + .exceptionally(ex -> { + log.error("[{}] Failed to get all namespaces in cluster of given anti-affinity group, cluster: {}, " + + "tenant: {}, antiAffinityGroup: {}", clientAppId(), cluster, property, antiAffinityGroup, + ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @DELETE @@ -621,11 +648,19 @@ public List getAntiAffinityNamespaces(@PathParam("cluster") String clust @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace does not exist"), @ApiResponse(code = 409, message = "Concurrent modification")}) - public void removeNamespaceAntiAffinityGroup(@PathParam("property") String property, + public void removeNamespaceAntiAffinityGroup(@Suspended AsyncResponse asyncResponse, + @PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) { validateNamespaceName(property, cluster, namespace); - internalRemoveNamespaceAntiAffinityGroup(); + internalRemoveNamespaceAntiAffinityGroupAsync() + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("[{}] Failed to remove namespace anti-affinity group, tenant: {}, namespace: {}", + clientAppId(), property, namespace, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 62bf5cd7aea77..90f4b087bfe85 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -2138,13 +2138,21 @@ public void removeMaxSubscriptionsPerTopic(@PathParam("tenant") String tenant, @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"), @ApiResponse(code = 412, message = "Invalid antiAffinityGroup")}) - public void setNamespaceAntiAffinityGroup(@PathParam("tenant") String tenant, + public void setNamespaceAntiAffinityGroup(@Suspended AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @ApiParam(value = "Anti-affinity group for the specified namespace", required = true) - String antiAffinityGroup) { + String antiAffinityGroup) { validateNamespaceName(tenant, namespace); - internalSetNamespaceAntiAffinityGroup(antiAffinityGroup); + internalSetNamespaceAntiAffinityGroupAsync(antiAffinityGroup) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("[{}] Failed to set namespace anti-affinity group, tenant: {}, namespace: {}, " + + "antiAffinityGroup: {}", clientAppId(), tenant, namespace, antiAffinityGroup, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @GET @@ -2152,10 +2160,18 @@ public void setNamespaceAntiAffinityGroup(@PathParam("tenant") String tenant, @ApiOperation(value = "Get anti-affinity group of a namespace.", response = String.class) @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") }) - public String getNamespaceAntiAffinityGroup(@PathParam("tenant") String tenant, - @PathParam("namespace") String namespace) { + public void getNamespaceAntiAffinityGroup(@Suspended AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace) { validateNamespaceName(tenant, namespace); - return internalGetNamespaceAntiAffinityGroup(); + internalGetNamespaceAntiAffinityGroupAsync() + .thenAccept(asyncResponse::resume) + .exceptionally(ex -> { + log.error("[{}] Failed to get namespace anti-affinity group, tenant: {}, namespace: {}", + clientAppId(), tenant, namespace, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @DELETE @@ -2166,10 +2182,18 @@ public String getNamespaceAntiAffinityGroup(@PathParam("tenant") String tenant, @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace does not exist"), @ApiResponse(code = 409, message = "Concurrent modification") }) - public void removeNamespaceAntiAffinityGroup(@PathParam("tenant") String tenant, - @PathParam("namespace") String namespace) { + public void removeNamespaceAntiAffinityGroup(@Suspended AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace) { validateNamespaceName(tenant, namespace); - internalRemoveNamespaceAntiAffinityGroup(); + internalRemoveNamespaceAntiAffinityGroupAsync() + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("[{}] Failed to remove namespace anti-affinity group, tenant: {}, namespace: {}", + clientAppId(), tenant, namespace, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @GET @@ -2179,9 +2203,19 @@ public void removeNamespaceAntiAffinityGroup(@PathParam("tenant") String tenant, response = String.class, responseContainer = "List") @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 412, message = "Cluster not exist/Anti-affinity group can't be empty.")}) - public List getAntiAffinityNamespaces(@PathParam("cluster") String cluster, - @PathParam("group") String antiAffinityGroup, @QueryParam("tenant") String tenant) { - return internalGetAntiAffinityNamespaces(cluster, antiAffinityGroup, tenant); + public void getAntiAffinityNamespaces(@Suspended AsyncResponse asyncResponse, + @PathParam("cluster") String cluster, + @PathParam("group") String antiAffinityGroup, + @QueryParam("tenant") String tenant) { + internalGetAntiAffinityNamespacesAsync(cluster, antiAffinityGroup, tenant) + .thenAccept(asyncResponse::resume) + .exceptionally(ex -> { + log.error("[{}] Failed to get all namespaces in cluster of given anti-affinity group, cluster: {}, " + + "tenant: {}, antiAffinityGroup: {}", clientAppId(), cluster, tenant, antiAffinityGroup, + ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @GET diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index 2aaa5676d5b9d..2f5b99bfd9de1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -2433,4 +2433,84 @@ public void testSetAndDeleteBookieAffinityGroup() throws Exception { setBookieAffinityGroupNs)); assertNull(bookieAffinityGroupDataResp); } + + @Test + public void testSetAndDeleteNamespaceAntiAffinityGroup() throws Exception { + // 1. create namespace with empty policies, namespace anti affinity group should be null + String setNamespaceAntiAffinityGroupNs = "test-set-namespace-anti-affinity-group-ns"; + asyncRequests(response -> namespaces.createNamespace(response, testTenant, testLocalCluster, + setNamespaceAntiAffinityGroupNs, (Policies) null)); + String namespaceAntiAffinityGroupResp = (String) asyncRequests( + response -> namespaces.getNamespaceAntiAffinityGroup(response, testTenant, testLocalCluster, + setNamespaceAntiAffinityGroupNs)); + assertNull(namespaceAntiAffinityGroupResp); + + // 2.set namespace anti affinity group + String namespaceAntiAffinityGroupReq = "namespace-anti-affinity-group"; + asyncRequests(response -> namespaces.setNamespaceAntiAffinityGroup(response, testTenant, testLocalCluster, + setNamespaceAntiAffinityGroupNs, namespaceAntiAffinityGroupReq)); + + // 3.assert namespace anti affinity group + namespaceAntiAffinityGroupResp = (String) asyncRequests( + response -> namespaces.getNamespaceAntiAffinityGroup(response, testTenant, testLocalCluster, + setNamespaceAntiAffinityGroupNs)); + assertEquals(namespaceAntiAffinityGroupResp, namespaceAntiAffinityGroupReq); + + // 4.delete namespace anti affinity group + asyncRequests(response -> namespaces.removeNamespaceAntiAffinityGroup(response, testTenant, testLocalCluster, + setNamespaceAntiAffinityGroupNs)); + + // 5.assert namespace anti affinity group + namespaceAntiAffinityGroupResp = (String) asyncRequests( + response -> namespaces.getNamespaceAntiAffinityGroup(response, testTenant, testLocalCluster, + setNamespaceAntiAffinityGroupNs)); + assertNull(namespaceAntiAffinityGroupResp); + } + + @Test + public void testGetClusterAntiAffinityNamespaces() throws Exception { + // create 5 namespaces, 3 namespaces are set to the same namespace anti affinity group, + // 2 namespaces are not set to any anti affinity group + String namespaceWithAntiAffinity1 = "namespace-with-anti-affinity-1"; + String namespaceWithAntiAffinity2 = "namespace-with-anti-affinity-2"; + String namespaceWithAntiAffinity3 = "namespace-with-anti-affinity-3"; + String namespaceWithoutAntiAffinity1 = "namespace-without-anti-affinity-1"; + String namespaceWithoutAntiAffinity2 = "namespace-without-anti-affinity-2"; + + // create namespaces + List allNamespaces = + List.of(namespaceWithAntiAffinity1, namespaceWithAntiAffinity2, namespaceWithAntiAffinity3, + namespaceWithoutAntiAffinity1, namespaceWithoutAntiAffinity2); + for (String namespace : allNamespaces) { + asyncRequests(response -> namespaces.createNamespace(response, testTenant, testLocalCluster, namespace, + (Policies) null)); + } + + // set namespace anti affinity group + String namespaceAntiAffinityGroupReq = "namespace-anti-affinity-group"; + List namespacesWithAntiAffinityGroup = + List.of(namespaceWithAntiAffinity1, namespaceWithAntiAffinity2, namespaceWithAntiAffinity3); + for (String namespace : namespacesWithAntiAffinityGroup) { + asyncRequests(response -> namespaces.setNamespaceAntiAffinityGroup(response, testTenant, testLocalCluster, + namespace, namespaceAntiAffinityGroupReq)); + } + + // assert namespace anti affinity group + for (String namespace : namespacesWithAntiAffinityGroup) { + String namespaceAntiAffinityGroupResp = (String) asyncRequests( + response -> namespaces.getNamespaceAntiAffinityGroup(response, testTenant, testLocalCluster, + namespace)); + assertEquals(namespaceAntiAffinityGroupResp, namespaceAntiAffinityGroupReq); + } + + // get namespaces in cluster of given anti affinity group + List namespacesResp = (List) asyncRequests( + response -> namespaces.getAntiAffinityNamespaces(response, testLocalCluster, + namespaceAntiAffinityGroupReq, testTenant)); + List namespacesWithFullPath = + namespacesWithAntiAffinityGroup.stream().map(ns -> NamespaceName.get(testTenant, testLocalCluster, ns)) + .map(NamespaceName::toString).toList(); + assertEquals(namespacesResp, namespacesWithFullPath); + } + } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java index 629e92c056f4c..596cfa3f396cf 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java @@ -305,8 +305,8 @@ public void testSetNamespaceAntiAffinityGroupWithEmptyPolicies() throws Exceptio // 2.set namespace anti affinity group String namespaceAntiAffinityGroupReq = "namespace-anti-affinity-group"; - namespaces.setNamespaceAntiAffinityGroup(testTenant, setNamespaceAntiAffinityGroupNs, - namespaceAntiAffinityGroupReq); + asyncRequests(response -> namespaces.setNamespaceAntiAffinityGroup(response, testTenant, + setNamespaceAntiAffinityGroupNs, namespaceAntiAffinityGroupReq)); // 3.query namespace num bundles, should be conf.getDefaultNumberOfNamespaceBundles() BundlesData bundlesData = (BundlesData) asyncRequests( @@ -314,8 +314,9 @@ public void testSetNamespaceAntiAffinityGroupWithEmptyPolicies() throws Exceptio assertEquals(bundlesData.getNumBundles(), conf.getDefaultNumberOfNamespaceBundles()); // 4.assert namespace anti affinity group - String namespaceAntiAffinityGroupResp = - namespaces.getNamespaceAntiAffinityGroup(testTenant, setNamespaceAntiAffinityGroupNs); + String namespaceAntiAffinityGroupResp = (String) asyncRequests( + response -> namespaces.getNamespaceAntiAffinityGroup(response, testTenant, + setNamespaceAntiAffinityGroupNs)); assertEquals(namespaceAntiAffinityGroupResp, namespaceAntiAffinityGroupReq); } @@ -330,8 +331,8 @@ public void testSetNamespaceAntiAffinityGroupWithExistBundlePolicies() throws Ex // 2.set namespace anti affinity group String namespaceAntiAffinityGroupReq = "namespace-anti-affinity-group"; - namespaces.setNamespaceAntiAffinityGroup(testTenant, setNamespaceAntiAffinityGroupNs, - namespaceAntiAffinityGroupReq); + asyncRequests(response -> namespaces.setNamespaceAntiAffinityGroup(response, testTenant, + setNamespaceAntiAffinityGroupNs, namespaceAntiAffinityGroupReq)); // 3.query namespace num bundles, should be policies.bundles, which we set before BundlesData bundlesData = (BundlesData) asyncRequests( @@ -339,8 +340,9 @@ public void testSetNamespaceAntiAffinityGroupWithExistBundlePolicies() throws Ex assertEquals(bundlesData, policies.bundles); // 4.assert namespace anti affinity group - String namespaceAntiAffinityGroupResp = - namespaces.getNamespaceAntiAffinityGroup(testTenant, setNamespaceAntiAffinityGroupNs); + String namespaceAntiAffinityGroupResp = (String) asyncRequests( + response -> namespaces.getNamespaceAntiAffinityGroup(response, testTenant, + setNamespaceAntiAffinityGroupNs)); assertEquals(namespaceAntiAffinityGroupResp, namespaceAntiAffinityGroupReq); } @@ -418,4 +420,81 @@ public void testSetAndDeleteBookieAffinityGroup() throws Exception { assertNull(bookieAffinityGroupDataResp); } + @Test + public void testSetAndDeleteNamespaceAntiAffinityGroup() throws Exception { + // 1. create namespace with empty policies, namespace anti affinity group should be null + String setNamespaceAntiAffinityGroupNs = "test-set-namespace-anti-affinity-group-ns"; + asyncRequests( + response -> namespaces.createNamespace(response, testTenant, setNamespaceAntiAffinityGroupNs, null)); + String namespaceAntiAffinityGroupResp = (String) asyncRequests( + response -> namespaces.getNamespaceAntiAffinityGroup(response, testTenant, + setNamespaceAntiAffinityGroupNs)); + assertNull(namespaceAntiAffinityGroupResp); + + // 2.set namespace anti affinity group + String namespaceAntiAffinityGroupReq = "namespace-anti-affinity-group"; + asyncRequests(response -> namespaces.setNamespaceAntiAffinityGroup(response, testTenant, + setNamespaceAntiAffinityGroupNs, namespaceAntiAffinityGroupReq)); + + // 3.assert namespace anti affinity group + namespaceAntiAffinityGroupResp = (String) asyncRequests( + response -> namespaces.getNamespaceAntiAffinityGroup(response, testTenant, + setNamespaceAntiAffinityGroupNs)); + assertEquals(namespaceAntiAffinityGroupResp, namespaceAntiAffinityGroupReq); + + // 4.delete namespace anti affinity group + asyncRequests(response -> namespaces.removeNamespaceAntiAffinityGroup(response, testTenant, + setNamespaceAntiAffinityGroupNs)); + + // 5.assert namespace anti affinity group + namespaceAntiAffinityGroupResp = (String) asyncRequests( + response -> namespaces.getNamespaceAntiAffinityGroup(response, testTenant, + setNamespaceAntiAffinityGroupNs)); + assertNull(namespaceAntiAffinityGroupResp); + } + + @Test + public void testGetClusterAntiAffinityNamespaces() throws Exception { + // create 5 namespaces, 3 namespaces are set to the same namespace anti affinity group, + // 2 namespaces are not set to any anti affinity group + String namespaceWithAntiAffinity1 = "namespace-with-anti-affinity-1"; + String namespaceWithAntiAffinity2 = "namespace-with-anti-affinity-2"; + String namespaceWithAntiAffinity3 = "namespace-with-anti-affinity-3"; + String namespaceWithoutAntiAffinity1 = "namespace-without-anti-affinity-1"; + String namespaceWithoutAntiAffinity2 = "namespace-without-anti-affinity-2"; + + // create namespaces + List allNamespaces = + List.of(namespaceWithAntiAffinity1, namespaceWithAntiAffinity2, namespaceWithAntiAffinity3, + namespaceWithoutAntiAffinity1, namespaceWithoutAntiAffinity2); + for (String namespace : allNamespaces) { + asyncRequests(response -> namespaces.createNamespace(response, testTenant, namespace, null)); + } + + // set namespace anti affinity group + String namespaceAntiAffinityGroupReq = "namespace-anti-affinity-group"; + List namespacesWithAntiAffinityGroup = + List.of(namespaceWithAntiAffinity1, namespaceWithAntiAffinity2, namespaceWithAntiAffinity3); + for (String namespace : namespacesWithAntiAffinityGroup) { + asyncRequests(response -> namespaces.setNamespaceAntiAffinityGroup(response, testTenant, namespace, + namespaceAntiAffinityGroupReq)); + } + + // assert namespace anti affinity group + for (String namespace : namespacesWithAntiAffinityGroup) { + String namespaceAntiAffinityGroupResp = (String) asyncRequests( + response -> namespaces.getNamespaceAntiAffinityGroup(response, testTenant, namespace)); + assertEquals(namespaceAntiAffinityGroupResp, namespaceAntiAffinityGroupReq); + } + + // get namespaces in cluster of given anti affinity group + List namespacesResp = (List) asyncRequests( + response -> namespaces.getAntiAffinityNamespaces(response, testLocalCluster, + namespaceAntiAffinityGroupReq, testTenant)); + List namespacesWithFullPath = + namespacesWithAntiAffinityGroup.stream().map(ns -> NamespaceName.get(testTenant, ns)) + .map(NamespaceName::toString).toList(); + assertEquals(namespacesResp, namespacesWithFullPath); + } + } From 712e2f6aa48124cfdaec36635a57c2390d460aad Mon Sep 17 00:00:00 2001 From: fanjianye Date: Tue, 16 Dec 2025 10:33:05 +0800 Subject: [PATCH 3/5] fix error in delayedMessagesCount --- .../InMemoryDelayedDeliveryTracker.java | 18 +++-- .../delayed/InMemoryDeliveryTrackerTest.java | 65 +++++++++++++++++++ 2 files changed, 78 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java index ad5ab25fbbf6b..d14f2ff294387 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java @@ -126,10 +126,13 @@ public boolean addMessage(long ledgerId, long entryId, long deliverAt) { } long timestamp = trimLowerBit(deliverAt, timestampPrecisionBitCnt); - delayedMessageMap.computeIfAbsent(timestamp, k -> new Long2ObjectRBTreeMap<>()) - .computeIfAbsent(ledgerId, k -> new Roaring64Bitmap()) - .add(entryId); - delayedMessagesCount.incrementAndGet(); + Roaring64Bitmap roaring64Bitmap = delayedMessageMap + .computeIfAbsent(timestamp, k -> new Long2ObjectRBTreeMap<>()) + .computeIfAbsent(ledgerId, k -> new Roaring64Bitmap()); + if (!roaring64Bitmap.contains(entryId)) { + roaring64Bitmap.add(entryId); + delayedMessagesCount.incrementAndGet(); + } updateTimer(); @@ -200,7 +203,12 @@ public NavigableSet getScheduledMessages(int maxMessages) { delayedMessagesCount.addAndGet(-n); n = 0; } - if (n <= 0) { + if (n == 0) { + break; + } else if (n < 0) { + // should not go into this situation + log.error("[{}] Delayed message tracker getScheduledMessages should not < 0, number is: {}", + dispatcher.getName(), n); break; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java index c5a564d1b664b..c7834ea2e3c10 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java @@ -274,4 +274,69 @@ public void testDelaySequence(InMemoryDelayedDeliveryTracker tracker) throws Exc tracker.close(); } + @Test(dataProvider = "delayedTracker") + public void testDelayedMessagesCountWithDuplicateEntryId(InMemoryDelayedDeliveryTracker tracker) throws Exception { + assertFalse(tracker.hasMessageAvailable()); + + // case1: addMessage() with duplicate entryId, + // getScheduledMessages() enter "cardinality <= n" and make tracker empty + assertTrue(tracker.addMessage(1, 1, 10)); + assertTrue(tracker.addMessage(1, 2, 20)); + assertTrue(tracker.addMessage(1, 2, 20)); + assertTrue(tracker.addMessage(1, 3, 40)); + assertTrue(tracker.addMessage(1, 4, 50)); + + clockTime.set(50); + assertTrue(tracker.hasMessageAvailable()); + assertEquals(tracker.getNumberOfDelayedMessages(), 4L); + assertEquals(tracker.delayedMessageMap.size(), 4L); + + Set scheduled = tracker.getScheduledMessages(10); + assertEquals(scheduled.size(), 4); + assertEquals(tracker.getNumberOfDelayedMessages(), 0L); + + + + // case2: addMessage() with duplicate entryId, + // getScheduledMessages() enter "cardinality > n" and make tracker empty + clockTime.set(0); + assertTrue(tracker.addMessage(1, 1, 10)); + assertTrue(tracker.addMessage(1, 2, 10)); + assertTrue(tracker.addMessage(1, 2, 10)); + assertTrue(tracker.addMessage(1, 3, 10)); + assertTrue(tracker.addMessage(1, 4, 10)); + + clockTime.set(50); + assertTrue(tracker.hasMessageAvailable()); + assertEquals(tracker.getNumberOfDelayedMessages(), 4L); + assertEquals(tracker.delayedMessageMap.size(), 1L); + + scheduled = tracker.getScheduledMessages(10); + assertEquals(scheduled.size(), 4); + assertEquals(tracker.getNumberOfDelayedMessages(), 0L); + + + + // case3: addMessage() with duplicate entryId, + // getScheduledMessages() make tracker remain half cardinality + clockTime.set(0); + assertTrue(tracker.addMessage(1, 1, 10)); + assertTrue(tracker.addMessage(1, 2, 10)); + assertTrue(tracker.addMessage(1, 2, 10)); + assertTrue(tracker.addMessage(1, 3, 10)); + assertTrue(tracker.addMessage(1, 4, 10)); + + clockTime.set(50); + assertTrue(tracker.hasMessageAvailable()); + assertEquals(tracker.getNumberOfDelayedMessages(), 4L); + assertEquals(tracker.delayedMessageMap.size(), 1L); + + scheduled = tracker.getScheduledMessages(2); + assertEquals(scheduled.size(), 2); + assertEquals(tracker.getNumberOfDelayedMessages(), 2L); + + + tracker.close(); + } + } From ca48bb363027f588c3bd46798d1c0ea0ea297dfe Mon Sep 17 00:00:00 2001 From: fanjianye Date: Tue, 16 Dec 2025 18:04:28 +0800 Subject: [PATCH 4/5] use addLong --- .../pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java index d14f2ff294387..d16803d897eea 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java @@ -130,7 +130,7 @@ public boolean addMessage(long ledgerId, long entryId, long deliverAt) { .computeIfAbsent(timestamp, k -> new Long2ObjectRBTreeMap<>()) .computeIfAbsent(ledgerId, k -> new Roaring64Bitmap()); if (!roaring64Bitmap.contains(entryId)) { - roaring64Bitmap.add(entryId); + roaring64Bitmap.addLong(entryId); delayedMessagesCount.incrementAndGet(); } From 9b562dccf145b01aa4af052db9d3a21aa26882a8 Mon Sep 17 00:00:00 2001 From: fanjianye Date: Wed, 17 Dec 2025 11:38:47 +0800 Subject: [PATCH 5/5] change comment --- .../broker/delayed/InMemoryDeliveryTrackerTest.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java index c7834ea2e3c10..cbe81212f41dd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java @@ -279,7 +279,8 @@ public void testDelayedMessagesCountWithDuplicateEntryId(InMemoryDelayedDelivery assertFalse(tracker.hasMessageAvailable()); // case1: addMessage() with duplicate entryId, - // getScheduledMessages() enter "cardinality <= n" and make tracker empty + // getScheduledMessages() enter multiple timestamp and "cardinality <= n", + // finally make tracker empty assertTrue(tracker.addMessage(1, 1, 10)); assertTrue(tracker.addMessage(1, 2, 20)); assertTrue(tracker.addMessage(1, 2, 20)); @@ -298,7 +299,8 @@ public void testDelayedMessagesCountWithDuplicateEntryId(InMemoryDelayedDelivery // case2: addMessage() with duplicate entryId, - // getScheduledMessages() enter "cardinality > n" and make tracker empty + // getScheduledMessages() enter one timestamp and "cardinality <= n", + // finally make tracker empty clockTime.set(0); assertTrue(tracker.addMessage(1, 1, 10)); assertTrue(tracker.addMessage(1, 2, 10)); @@ -318,7 +320,8 @@ public void testDelayedMessagesCountWithDuplicateEntryId(InMemoryDelayedDelivery // case3: addMessage() with duplicate entryId, - // getScheduledMessages() make tracker remain half cardinality + // getScheduledMessages() enter one timestamp and "cardinality > n", + // finally make tracker remain half cardinality clockTime.set(0); assertTrue(tracker.addMessage(1, 1, 10)); assertTrue(tracker.addMessage(1, 2, 10));