Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2731,14 +2731,22 @@ public void maybeUpdateCursorBeforeTrimmingConsumedLedger() {
}

if (!lastAckedPosition.equals(cursor.getMarkDeletedPosition())) {
try {
log.info("Reset cursor:{} to {} since ledger consumed completely", cursor, lastAckedPosition);
onCursorMarkDeletePositionUpdated((ManagedCursorImpl) cursor, lastAckedPosition);
} catch (Exception e) {
log.warn("Failed to reset cursor: {} from {} to {}. Trimming thread will retry next time.",
cursor, cursor.getMarkDeletedPosition(), lastAckedPosition);
log.warn("Caused by", e);
}
Position finalPosition = lastAckedPosition;
log.info("Reset cursor:{} to {} since ledger consumed completely", cursor, lastAckedPosition);
cursor.asyncMarkDelete(lastAckedPosition, cursor.getProperties(),
new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
log.info("Successfully persisted cursor position for cursor:{} to {}",
cursor, finalPosition);
}

@Override
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
log.warn("Failed to reset cursor: {} from {} to {}. Trimming thread will retry next time.",
cursor, cursor.getMarkDeletedPosition(), finalPosition, exception);
}
}, null);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1277,7 +1277,9 @@ void cursorPersistence() throws Exception {
c2 = ledger.openCursor("c2");

assertEquals(c1.getMarkDeletedPosition(), p1);
assertEquals(c2.getMarkDeletedPosition(), p2);
// move mark-delete-position from 3:5 to 6:-1 since all the entries have been consumed
ManagedCursor finalC2 = c2;
Awaitility.await().untilAsserted(() -> assertNotEquals(finalC2.getMarkDeletedPosition(), p2));
}

@Test(timeOut = 20000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5107,4 +5107,57 @@ public void testComparePositions() throws Exception {
// cleanup.
ml.delete();
}

@Test
public void testTrimmerRaceCondition() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(1);
config.setRetentionTime(0, TimeUnit.MILLISECONDS);
config.setRetentionSizeInMB(0);

ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testTrimmerRaceCondition", config);
ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c1");

// 1. Add Entry 1 (Ledger 1)
ledger.addEntry("entry-1".getBytes(Encoding));

// 2. Ack Entry 1. Verify Persistence with properties.
List<Entry> entries = cursor.readEntries(1);
assertEquals(entries.size(), 1);
Position lastPosition = entries.get(0).getPosition();
entries.forEach(Entry::release);

// Mark delete with properties
Map<String, Long> properties = new HashMap<>();
properties.put("test-property", 12345L);
CountDownLatch latch = new CountDownLatch(1);
cursor.asyncMarkDelete(lastPosition, properties, new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
latch.countDown();
}

@Override
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
fail("Mark delete should succeed");
}
}, null);

latch.await();
assertEquals(cursor.getPersistentMarkDeletedPosition(), lastPosition);
assertEquals(ledger.getCursors().getSlowestCursorPosition(), lastPosition);
assertEquals(cursor.getProperties(), properties);

// 3. Add Entry 2. Triggers Rollover.
// This implicitly calls maybeUpdateCursorBeforeTrimmingConsumedLedger due to rollover
Position p = ledger.addEntry("entry-2".getBytes(Encoding));

// Wait for background tasks (metadata callback) to complete.
// We expect at least 2 ledgers (Rollover happened).
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> ledger.getLedgersInfo().size() >= 2);
assertEquals(cursor.getPersistentMarkDeletedPosition(), new ImmutablePositionImpl(p.getLedgerId(), -1));

// Verify properties are preserved after cursor reset
assertEquals(cursor.getProperties(), properties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ public LocalPoliciesResources(MetadataStore localStore, int operationTimeoutSec)
super(localStore, LocalPolicies.class, operationTimeoutSec);
}

public void setLocalPolicies(NamespaceName ns, Function<LocalPolicies, LocalPolicies> modifyFunction)
throws MetadataStoreException {
set(joinPath(LOCAL_POLICIES_ROOT, ns.toString()), modifyFunction);
public CompletableFuture<Void> setLocalPoliciesAsync(NamespaceName ns,
Function<LocalPolicies, LocalPolicies> modifyFunction) {
return setAsync(joinPath(LOCAL_POLICIES_ROOT, ns.toString()), modifyFunction);
}

public Optional<LocalPolicies> getLocalPolicies(NamespaceName ns) throws MetadataStoreException{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin;

import static org.apache.commons.lang3.StringUtils.isBlank;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
Expand Down Expand Up @@ -498,14 +499,12 @@ protected CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadat
});
}

protected void validateClusterExists(String cluster) {
try {
if (!clusterResources().getCluster(cluster).isPresent()) {
protected CompletableFuture<Void> validateClusterExistsAsync(String cluster) {
return clusterResources().clusterExistsAsync(cluster).thenAccept(clusterExist -> {
if (!clusterExist) {
throw new RestException(Status.PRECONDITION_FAILED, "Cluster " + cluster + " does not exist.");
}
} catch (Exception e) {
throw new RestException(e);
}
});
}

protected Policies getNamespacePolicies(String tenant, String cluster, String namespace) {
Expand Down Expand Up @@ -874,6 +873,12 @@ protected void checkNotNull(Object o, String errorMessage) {
}
}

protected void checkNotBlank(String str, String errorMessage) {
if (isBlank(str)) {
throw new RestException(Status.PRECONDITION_FAILED, errorMessage);
}
}

protected boolean isManagedLedgerNotFoundException(Throwable cause) {
return cause instanceof ManagedLedgerException.MetadataNotFoundException
|| cause instanceof MetadataStoreException.NotFoundException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.admin.impl;

import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.common.policies.data.PoliciesUtil.getBundles;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
Expand Down Expand Up @@ -1038,35 +1037,6 @@ protected CompletableFuture<Void> internalDeleteBookieAffinityGroupAsync() {
return internalSetBookieAffinityGroupAsync(null);
}

@Deprecated
protected BookieAffinityGroupData internalGetBookieAffinityGroup() {
validateSuperUserAccess();

if (namespaceName.isGlobal()) {
// check cluster ownership for a given global namespace: redirect if peer-cluster owns it
validateGlobalNamespaceOwnership(namespaceName);
} else {
validateClusterOwnership(namespaceName.getCluster());
validateClusterForTenant(namespaceName.getTenant(), namespaceName.getCluster());
}
try {
final BookieAffinityGroupData bookkeeperAffinityGroup = getLocalPolicies().getLocalPolicies(namespaceName)
.orElseThrow(() -> new RestException(Status.NOT_FOUND,
"Namespace local-policies does not exist")).bookieAffinityGroup;
return bookkeeperAffinityGroup;
} catch (NotFoundException e) {
log.warn("[{}] Failed to get local-policy configuration for namespace {}: does not exist",
clientAppId(), namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace policies does not exist");
} catch (RestException re) {
throw re;
} catch (Exception e) {
log.error("[{}] Failed to get local-policy configuration for namespace {}", clientAppId(),
namespaceName, e);
throw new RestException(e);
}
}

protected CompletableFuture<BookieAffinityGroupData> internalGetBookieAffinityGroupAsync() {
return validateSuperUserAccessAsync().thenCompose(__ -> {
if (namespaceName.isGlobal()) {
Expand All @@ -1077,9 +1047,8 @@ protected CompletableFuture<BookieAffinityGroupData> internalGetBookieAffinityGr
unused -> validateClusterForTenantAsync(namespaceName.getTenant(), namespaceName.getCluster()));
}
}).thenCompose(__ -> getLocalPolicies().getLocalPoliciesAsync(namespaceName))
.thenApply(policies -> policies.orElseThrow(
() -> new RestException(Status.NOT_FOUND, "Namespace local-policies does not exist"))
.bookieAffinityGroup);
.thenApply(policies -> policies.orElseThrow(() -> new RestException(Status.NOT_FOUND,
"Namespace local-policies does not exist")).bookieAffinityGroup);
}

private CompletableFuture<Void> validateLeaderBrokerAsync() {
Expand Down Expand Up @@ -1842,104 +1811,69 @@ protected void internalSetDelayedDelivery(DelayedDeliveryPolicies delayedDeliver
internalSetPolicies("delayed_delivery_policies", delayedDeliveryPolicies);
}

protected void internalSetNamespaceAntiAffinityGroup(String antiAffinityGroup) {
validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE);
checkNotNull(antiAffinityGroup, "AntiAffinityGroup should not be null");
validatePoliciesReadOnlyAccess();

log.info("[{}] Setting anti-affinity group {} for {}", clientAppId(), antiAffinityGroup, namespaceName);

if (isBlank(antiAffinityGroup)) {
throw new RestException(Status.PRECONDITION_FAILED, "antiAffinityGroup can't be empty");
}

try {
getLocalPolicies().setLocalPoliciesWithCreate(namespaceName, (lp)->
lp.map(policies -> new LocalPolicies(policies.bundles,
policies.bookieAffinityGroup,
antiAffinityGroup,
policies.migrated))
.orElseGet(() -> new LocalPolicies(getDefaultBundleData(), null, antiAffinityGroup))
);
log.info("[{}] Successfully updated local-policies configuration: namespace={}, map={}", clientAppId(),
namespaceName, antiAffinityGroup);
} catch (RestException re) {
throw re;
} catch (Exception e) {
log.error("[{}] Failed to update local-policy configuration for namespace {}", clientAppId(), namespaceName,
e);
throw new RestException(e);
}
}

protected String internalGetNamespaceAntiAffinityGroup() {
validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.READ);

try {
return getLocalPolicies()
.getLocalPolicies(namespaceName)
.orElseGet(() -> new LocalPolicies(getBundles(config().getDefaultNumberOfNamespaceBundles())
, null, null)).namespaceAntiAffinityGroup;
} catch (Exception e) {
log.error("[{}] Failed to get the antiAffinityGroup of namespace {}", clientAppId(), namespaceName, e);
throw new RestException(Status.NOT_FOUND, "Couldn't find namespace policies");
}
}

protected void internalRemoveNamespaceAntiAffinityGroup() {
validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();

log.info("[{}] Deleting anti-affinity group for {}", clientAppId(), namespaceName);

try {
getLocalPolicies().setLocalPolicies(namespaceName, (policies)->
new LocalPolicies(policies.bundles,
policies.bookieAffinityGroup,
null,
policies.migrated));
log.info("[{}] Successfully removed anti-affinity group for a namespace={}", clientAppId(), namespaceName);
} catch (Exception e) {
log.error("[{}] Failed to remove anti-affinity group for namespace {}", clientAppId(), namespaceName, e);
throw new RestException(e);
}
protected CompletableFuture<Void> internalSetNamespaceAntiAffinityGroupAsync(String antiAffinityGroup) {
checkNotNull(antiAffinityGroup, "Anti-affinity group should not be null");
checkNotBlank(antiAffinityGroup, "Anti-affinity group can't be empty");
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()).thenCompose(
__ -> getDefaultBundleDataAsync().thenCompose(
defaultBundleData -> getLocalPolicies().setLocalPoliciesWithCreateAsync(namespaceName,
oldPolicies -> oldPolicies.map(policies -> new LocalPolicies(policies.bundles,
policies.bookieAffinityGroup, antiAffinityGroup,
policies.migrated))
.orElseGet(() -> new LocalPolicies(defaultBundleData, null,
antiAffinityGroup)))))
.thenAccept(__ -> log.info(
"[{}] Successfully updated namespace anti-affinity group, namespace={}, anti-affinity"
+ " group={}", clientAppId(), namespaceName, antiAffinityGroup));
}

protected CompletableFuture<String> internalGetNamespaceAntiAffinityGroupAsync() {
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.READ)
.thenCompose(__ -> getLocalPolicies().getLocalPoliciesAsync(namespaceName)
.thenApply(policiesOpt -> policiesOpt.map(localPolicies -> localPolicies.namespaceAntiAffinityGroup)
.orElse(null)));
}

protected CompletableFuture<Void> internalRemoveNamespaceAntiAffinityGroupAsync() {
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> {
log.info("[{}] Removing anti-affinity group for namespace: {}", clientAppId(), namespaceName);
return getLocalPolicies().setLocalPoliciesAsync(namespaceName,
(policies) -> new LocalPolicies(policies.bundles, policies.bookieAffinityGroup, null,
policies.migrated));
})
.thenAccept(__ -> log.info("[{}] Successfully removed anti-affinity group for namespace: {}",
clientAppId(), namespaceName));
}

protected List<String> internalGetAntiAffinityNamespaces(String cluster, String antiAffinityGroup,
String tenant) {
validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.READ);
protected CompletableFuture<List<String>> internalGetAntiAffinityNamespacesAsync(String cluster,
String antiAffinityGroup,
String tenant) {
checkNotNull(cluster, "Cluster should not be null");
checkNotNull(antiAffinityGroup, "AntiAffinityGroup should not be null");
checkNotNull(antiAffinityGroup, "Anti-affinity group should not be null");
checkNotNull(tenant, "Tenant should not be null");
checkNotBlank(antiAffinityGroup, "Anti-affinity group can't be empty");

log.info("[{}]-{} Finding namespaces for {} in {}", clientAppId(), tenant, antiAffinityGroup, cluster);

if (isBlank(antiAffinityGroup)) {
throw new RestException(Status.PRECONDITION_FAILED, "anti-affinity group can't be empty.");
}
validateClusterExists(cluster);

try {
List<String> namespaces = tenantResources().getListOfNamespaces(tenant);

return namespaces.stream().filter(ns -> {
Optional<LocalPolicies> policies;
try {
policies = getLocalPolicies().getLocalPolicies(NamespaceName.get(ns));
} catch (Exception e) {
throw new RuntimeException(e);
}

String storedAntiAffinityGroup = policies.orElseGet(() ->
new LocalPolicies(getBundles(config().getDefaultNumberOfNamespaceBundles()),
null, null)).namespaceAntiAffinityGroup;
return antiAffinityGroup.equalsIgnoreCase(storedAntiAffinityGroup);
}).collect(Collectors.toList());

} catch (Exception e) {
log.warn("Failed to list of properties/namespace from global-zk", e);
throw new RestException(e);
}
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.READ)
.thenCompose(__ -> validateClusterExistsAsync(cluster))
.thenCompose(__ -> {
log.info("[{}]-{} Finding namespaces for {} in {}", clientAppId(), tenant, antiAffinityGroup,
cluster);
return tenantResources().getListOfNamespacesAsync(tenant).thenCompose(namespaces -> {
List<CompletableFuture<String>> nsFutures = namespaces.stream()
.map(ns -> getLocalPolicies().getLocalPoliciesAsync(NamespaceName.get(ns))
.thenApply(policiesOpt -> policiesOpt.map(
localPolicies -> localPolicies.namespaceAntiAffinityGroup).orElse(null))
.thenApply(antiAffinityGroup::equalsIgnoreCase)
.thenApply(equals -> equals ? ns : null)).toList();
CompletableFuture<Void> allFuture = FutureUtil.waitForAll(nsFutures);
return allFuture.thenApply(
unused -> nsFutures.stream().map(CompletableFuture::join).filter(Objects::nonNull)
.toList());
});
});
}

private boolean checkQuotas(Policies policies, RetentionPolicies retention) {
Expand Down
Loading
Loading