result = future.get(60, TimeUnit.SECONDS);
+ successCount.incrementAndGet();
+ if (result.isEmpty()) {
+ emptyResultCount.incrementAndGet();
+ }
+ } catch (Exception e) {
+ log.error("Operation failed", e);
+ failureCount.incrementAndGet();
+ }
+ }
+
+ long duration = System.currentTimeMillis() - startTime;
+ log.info("High contention test completed in {}ms: success={}, empty={}, failure={}",
+ duration, successCount.get(), emptyResultCount.get(), failureCount.get());
+
+ // All operations should complete without exception
+ assertEquals(successCount.get(), numOperations,
+ "All operations should complete successfully");
+ assertEquals(failureCount.get(), 0, "No operations should fail");
+
+ // After the first successful trim, subsequent operations should return empty list
+ // (because ledgers are already deleted)
+ assertTrue(emptyResultCount.get() >= numOperations - 1,
+ "At least " + (numOperations - 1) + " operations should return empty list after first trim");
+
+ // Verify final state
+ int finalLedgerCount = ledger.getLedgersInfo().size();
+ assertTrue(finalLedgerCount >= 1 && finalLedgerCount < ledgerIds.length,
+ "Final ledger count should be reduced but not zero. Initial: " + ledgerIds.length
+ + ", Final: " + finalLedgerCount);
+ }
+}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index 50ac4fc243414..883a060469acd 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -19,6 +19,7 @@
package org.apache.bookkeeper.mledger;
import io.netty.buffer.ByteBuf;
+import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
@@ -753,4 +754,36 @@ default void skipNonRecoverableLedger(long ledgerId){}
default void addLedgerEventListener(ManagedLedgerEventListener listener) {
// No-op by default
}
+
+ /**
+ * Async trim consumed ledgers before the specified ledger ID.
+ *
+ * This method deletes all ledgers up to and including the specified ledger ID,
+ * as long as they have been fully consumed by all cursors.
+ *
+ *
Semantics:
+ *
+ * - For current ledger: delete all ledgers BEFORE it (keep current ledger)
+ * - For middle ledger: delete the ledger AND all BEFORE it (do NOT keep boundary)
+ * - If ledgerId doesn't exist: use next lower existing ledger as boundary
+ *
+ *
+ * Example:
+ *
+ * trimConsumedLedgersBefore(L4) where L4 is current: delete L1, L2, L3, keep L4 → returns [L1, L2, L3]
+ * trimConsumedLedgersBefore(L3) where L3 is middle: delete L1, L2, L3, keep L4 → returns [L1, L2, L3]
+ * trimConsumedLedgersBefore(L2) where L2 is middle: delete L1, L2, keep L3, L4 → returns [L1, L2]
+ *
+ *
+ * @param ledgerId the ledger ID to trim before
+ * @return a future that completes with the list of deleted ledger IDs
+ * @throws ManagedLedgerException if ledgers are not fully consumed or operation fails
+ */
+ default CompletableFuture> asyncTrimConsumedLedgersBefore(long ledgerId) {
+ // Default implementation returns a failed future for unsupported operations
+ CompletableFuture> future = new CompletableFuture<>();
+ future.completeExceptionally(new ManagedLedgerException(
+ "asyncTrimConsumedLedgersBefore is not supported."));
+ return future;
+ }
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index fa0d82552076f..b2b74e8c7c7ad 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -375,11 +375,8 @@ public void asyncOpen(final String name, final ManagedLedgerConfig config, final
new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
config.getBookKeeperEnsemblePlacementPolicyProperties()))
.thenAccept(bk -> {
- final ManagedLedgerImpl newledger = config.getShadowSource() == null
- ? new ManagedLedgerImpl(this, bk, store, config, scheduledExecutor, name,
- mlOwnershipChecker)
- : new ShadowManagedLedgerImpl(this, bk, store, config, scheduledExecutor, name,
- mlOwnershipChecker);
+ final ManagedLedgerImpl newledger = createManagedLedgerInstance(bk, store, config,
+ scheduledExecutor, name, mlOwnershipChecker);
PendingInitializeManagedLedger pendingLedger = new PendingInitializeManagedLedger(newledger);
pendingInitializeLedgers.put(name, pendingLedger);
newledger.initialize(new ManagedLedgerInitializeLedgerCallback() {
@@ -1044,6 +1041,26 @@ public CompletableFuture getBookKeeper() {
return bookkeeperFactory.get();
}
+ /**
+ * Factory method to create a ManagedLedgerImpl instance. This method can be overridden
+ * by subclasses to provide custom ManagedLedgerImpl implementations.
+ *
+ * @param bk the BookKeeper client
+ * @param store the metadata store
+ * @param config the managed ledger configuration
+ * @param scheduledExecutor the scheduler for executing tasks
+ * @param name the managed ledger name
+ * @param mlOwnershipChecker supplier to check ownership
+ * @return a new ManagedLedgerImpl instance
+ */
+ protected ManagedLedgerImpl createManagedLedgerInstance(BookKeeper bk, MetaStore store,
+ ManagedLedgerConfig config, OrderedScheduler scheduledExecutor,
+ String name, Supplier> mlOwnershipChecker) {
+ return config.getShadowSource() == null
+ ? new ManagedLedgerImpl(this, bk, store, config, scheduledExecutor, name, mlOwnershipChecker)
+ : new ShadowManagedLedgerImpl(this, bk, store, config, scheduledExecutor, name, mlOwnershipChecker);
+ }
+
/**
* Factory to create Bookkeeper-client for a given ensemblePlacementPolicy.
*
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index d2bd122099db6..17a9486ec1c36 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -216,7 +216,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
* This lock is held while the ledgers list or propertiesMap is updated asynchronously on the metadata store.
* Since we use the store version, we cannot have multiple concurrent updates.
*/
+ @Getter
private final CallbackMutex metadataMutex = new CallbackMutex();
+ @Getter
private final CallbackMutex trimmerMutex = new CallbackMutex();
private final CallbackMutex offloadMutex = new CallbackMutex();
@@ -2200,7 +2202,7 @@ CompletableFuture getLedgerHandle(long ledgerId) {
});
}
- void invalidateReadHandle(long ledgerId) {
+ protected void invalidateReadHandle(long ledgerId) {
CompletableFuture rhf = ledgerCache.remove(ledgerId);
if (rhf != null) {
rhf.thenCompose(r -> {
@@ -2805,7 +2807,7 @@ private boolean isLedgerRetentionOverSizeQuota(long retentionSizeInMB, long tota
return retentionSizeInMB >= 0 && totalSizeOfML - sizeToDelete >= retentionSizeInMB * MegaByte;
}
- boolean isOffloadedNeedsDelete(OffloadContext offload, Optional offloadPolicies) {
+ protected boolean isOffloadedNeedsDelete(OffloadContext offload, Optional offloadPolicies) {
long elapsedMs = clock.millis() - offload.getTimestamp();
return offloadPolicies.filter(policies -> offload.getComplete() && !offload.getBookkeeperDeleted()
&& policies.getManagedLedgerOffloadDeletionLagInMillis() != null
@@ -2821,7 +2823,7 @@ void internalTrimConsumedLedgers(CompletableFuture> promise) {
internalTrimLedgers(false, promise);
}
- private Optional getOffloadPoliciesIfAppendable() {
+ protected Optional getOffloadPoliciesIfAppendable() {
LedgerOffloader ledgerOffloader = config.getLedgerOffloader();
if (ledgerOffloader == null
|| !ledgerOffloader.isAppendable()
@@ -2832,7 +2834,7 @@ private Optional getOffloadPoliciesIfAppendable() {
}
@VisibleForTesting
- synchronized List internalEvictOffloadedLedgers() {
+ protected synchronized List internalEvictOffloadedLedgers() {
long inactiveOffloadedLedgerEvictionTimeMs = config.getInactiveOffloadedLedgerEvictionTimeMs();
if (inactiveOffloadedLedgerEvictionTimeMs <= 0) {
return Collections.emptyList();
@@ -3151,7 +3153,7 @@ protected void doDeleteLedgers(List ledgersToDelete) {
* entries and the stats are reported correctly.
*/
@VisibleForTesting
- void advanceCursorsIfNecessary(List ledgersToDelete) throws LedgerNotExistException {
+ protected void advanceCursorsIfNecessary(List ledgersToDelete) throws LedgerNotExistException {
if (ledgersToDelete.isEmpty()) {
return;
}
@@ -3308,7 +3310,7 @@ private CompletableFuture asyncDeleteLedgerFromBookKeeper(long ledgerId) {
return asyncDeleteLedger(ledgerId, DEFAULT_LEDGER_DELETE_RETRIES);
}
- private void asyncDeleteLedger(long ledgerId, LedgerInfo info) {
+ protected void asyncDeleteLedger(long ledgerId, LedgerInfo info) {
if (!info.getOffloadContext().getBookkeeperDeleted()) {
// only delete if it hasn't been previously deleted for offload
asyncDeleteLedger(ledgerId, DEFAULT_LEDGER_DELETE_RETRIES);
@@ -3322,7 +3324,7 @@ private void asyncDeleteLedger(long ledgerId, LedgerInfo info) {
}
}
- private CompletableFuture asyncDeleteLedger(long ledgerId, long retry) {
+ protected CompletableFuture asyncDeleteLedger(long ledgerId, long retry) {
CompletableFuture future = new CompletableFuture<>();
asyncDeleteLedgerWithRetry(future, ledgerId, retry);
return future;
@@ -4929,7 +4931,7 @@ private void notifyRollLedgerEvent(LedgerRollEvent event) {
}
}
- private void notifyDeleteLedgerEvent(LedgerInfo... ledgerInfos) {
+ protected void notifyDeleteLedgerEvent(LedgerInfo... ledgerInfos) {
for (ManagedLedgerEventListener listener : ledgerEventListeners) {
try {
listener.onLedgerDelete(ledgerInfos);
diff --git a/pom.xml b/pom.xml
index 473e70eea87b9..9bebb92c7d8db 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2462,6 +2462,7 @@ flexible messaging model and an intuitive client API.
buildtools
testmocks
managed-ledger
+ managed-ledger-ext
tiered-storage
pulsar-common
pulsar-broker-common
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 02783622adf43..289ef10bce508 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -5033,6 +5033,86 @@ private CompletableFuture trimPartitionedTopic(AsyncResponse asyncResponse
return FutureUtil.waitForAll(futures).thenAccept(asyncResponse::resume);
}
+ protected CompletableFuture> internalTrimConsumedLedgersBefore(
+ AsyncResponse asyncResponse, long ledgerId, boolean authoritative) {
+ if (!topicName.isPersistent()) {
+ log.info("[{}] TrimConsumedLedgersBefore on a non-persistent topic {} is not allowed",
+ clientAppId(), topicName);
+ asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+ "TrimConsumedLedgersBefore on a non-persistent topic is not allowed"));
+ return CompletableFuture.completedFuture(Collections.emptyList());
+ }
+ CompletableFuture future = validateTopicOperationAsync(topicName, TopicOperation.TRIM_TOPIC);
+ if (topicName.isPartitioned()) {
+ return future.thenCompose((__) -> trimConsumedLedgersBeforeNonPartitionedTopic(
+ asyncResponse, topicName, ledgerId, authoritative));
+ }
+ return future
+ .thenCompose(__ -> pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName))
+ .thenCompose(metadata -> {
+ if (metadata.partitions > 0) {
+ return trimConsumedLedgersBeforePartitionedTopic(
+ asyncResponse, metadata, ledgerId);
+ }
+ return trimConsumedLedgersBeforeNonPartitionedTopic(
+ asyncResponse, topicName, ledgerId, authoritative);
+ });
+ }
+
+ private CompletableFuture> trimConsumedLedgersBeforeNonPartitionedTopic(
+ AsyncResponse asyncResponse, TopicName topicName, long ledgerId, boolean authoritative) {
+ return validateTopicOwnershipAsync(topicName, authoritative)
+ .thenCompose(__ -> getTopicReferenceAsync(topicName))
+ .thenCompose(topic -> {
+ if (!(topic instanceof PersistentTopic persistentTopic)) {
+ log.info("[{}] TrimConsumedLedgersBefore on a non-persistent topic {} is not allowed",
+ clientAppId(), topicName);
+ asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+ "TrimConsumedLedgersBefore on a non-persistent topic is not allowed"));
+ return CompletableFuture.completedFuture(Collections.emptyList());
+ }
+ ManagedLedger managedLedger = persistentTopic.getManagedLedger();
+ if (managedLedger == null) {
+ asyncResponse.resume(Collections.emptyList());
+ return CompletableFuture.completedFuture(Collections.emptyList());
+ }
+
+ // Directly call asyncTrimConsumedLedgersBefore on the ManagedLedger interface
+ CompletableFuture> result = managedLedger.asyncTrimConsumedLedgersBefore(ledgerId);
+ return result.whenComplete((res, e) -> {
+ if (e != null) {
+ asyncResponse.resume(e);
+ } else {
+ asyncResponse.resume(res);
+ }
+ });
+ });
+ }
+
+ private CompletableFuture> trimConsumedLedgersBeforePartitionedTopic(
+ AsyncResponse asyncResponse, PartitionedTopicMetadata metadata, long ledgerId) {
+ List>> futures = new ArrayList<>(metadata.partitions);
+ for (int i = 0; i < metadata.partitions; i++) {
+ TopicName topicNamePartition = topicName.getPartition(i);
+ try {
+ futures.add(pulsar().getAdminClient().topics()
+ .trimConsumedLedgersBeforeAsync(topicNamePartition.toString(), ledgerId));
+ } catch (Exception e) {
+ log.error("[{}] Failed to trim consumed ledgers before {} for topic {}",
+ clientAppId(), ledgerId, topicNamePartition, e);
+ throw new RestException(e);
+ }
+ }
+ return FutureUtil.waitForAll(futures)
+ .thenApply(v -> {
+ List allDeletedLedgerIds = futures.stream()
+ .flatMap(f -> f.join().stream())
+ .collect(Collectors.toList());
+ asyncResponse.resume(allDeletedLedgerIds);
+ return allDeletedLedgerIds;
+ });
+ }
+
protected CompletableFuture internalGetDispatchRate(boolean applied, boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getDispatchRate)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index ff28e5bcca374..8227c128f2dd7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -3540,6 +3540,51 @@ public void trimTopic(
}
}
+ @POST
+ @Path("/{tenant}/{namespace}/{topic}/trimConsumedLedgersBefore/{ledgerId}")
+ @ApiOperation(value = "Trim consumed ledgers before a specific ledger ID", response = List.class,
+ responseContainer = "List")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Operation successful", response = List.class,
+ responseContainer = "List"),
+ @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
+ @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or"
+ + "subscriber is not authorized to access this operation"),
+ @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Namespace or topic does not exist"),
+ @ApiResponse(code = 405, message = "Operation is not allowed on the persistent topic"),
+ @ApiResponse(code = 412, message = "Topic name is not valid"),
+ @ApiResponse(code = 500, message = "Internal server error"),
+ @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
+ public void trimConsumedLedgersBefore(
+ @Suspended final AsyncResponse asyncResponse,
+ @ApiParam(value = "Specify the tenant", required = true)
+ @PathParam("tenant") String tenant,
+ @ApiParam(value = "Specify the namespace", required = true)
+ @PathParam("namespace") String namespace,
+ @ApiParam(value = "Specify topic name", required = true)
+ @PathParam("topic") @Encoded String encodedTopic,
+ @ApiParam(value = "Ledger ID to trim before", required = true)
+ @PathParam("ledgerId") long ledgerId,
+ @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
+ @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+ try {
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalTrimConsumedLedgersBefore(asyncResponse, ledgerId, authoritative)
+ .exceptionally(ex -> {
+ // If the exception is not redirect exception we need to log it.
+ if (isNot307And404Exception(ex)) {
+ log.error("[{}] Failed to trim consumed ledgers before {} for topic {}",
+ clientAppId(), ledgerId, topicName, ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ }
+ }
+
@GET
@Path("/{tenant}/{namespace}/{topic}/dispatchRate")
@ApiOperation(value = "Get dispatch rate configuration for specified topic.", response = DispatchRateImpl.class)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTrimConsumedLedgersBeforeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTrimConsumedLedgersBeforeTest.java
new file mode 100644
index 0000000000000..f3b78d5cbe955
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTrimConsumedLedgersBeforeTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.fail;
+
+import java.util.Set;
+
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Test class for asyncTrimConsumedLedgersBefore admin API.
+ *
+ * This test verifies that the admin API methods exist and are callable.
+ * Full functional testing with ManagedLedgerClientFactoryExt is in
+ * the managed-ledger-ext module to avoid cyclic dependencies.
+ *
+ * Note: Due to module dependency constraints (managed-ledger-ext depends
+ * on pulsar-broker), we cannot add managed-ledger-ext as a test dependency
+ * in pulsar-broker without creating a cyclic dependency.
+ */
+@Test(groups = "broker-admin")
+public class AdminApiTrimConsumedLedgersBeforeTest extends MockedPulsarServiceBaseTest {
+
+ private final String testTenant = "trim-test";
+ private final String testNamespace = "ns1";
+ private final String myNamespace = testTenant + "/" + testNamespace;
+
+ @BeforeMethod
+ @Override
+ public void setup() throws Exception {
+ super.internalSetup();
+
+ // Setup namespaces
+ admin.clusters().createCluster("test", ClusterData.builder()
+ .serviceUrl(pulsar.getWebServiceAddress()).build());
+ TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
+ admin.tenants().createTenant(testTenant, tenantInfo);
+ admin.namespaces().createNamespace(myNamespace, Set.of("test"));
+ }
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ public void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ /**
+ * Test that the admin API methods exist and have correct signatures.
+ */
+ @Test
+ public void testTrimConsumedLedgersBeforeApiExists() {
+ // This test verifies that the admin API methods are available
+ // with the correct signatures
+
+ // Test synchronous method exists
+ try {
+ admin.topics().getClass().getMethod("trimConsumedLedgersBefore", String.class, long.class);
+ } catch (NoSuchMethodException e) {
+ fail("trimConsumedLedgersBefore method should exist on Topics interface");
+ }
+
+ // Test asynchronous method exists
+ try {
+ admin.topics().getClass().getMethod("trimConsumedLedgersBeforeAsync", String.class, long.class);
+ } catch (NoSuchMethodException e) {
+ fail("trimConsumedLedgersBeforeAsync method should exist on Topics interface");
+ }
+ }
+
+ /**
+ * Test that the API methods are callable through admin client.
+ * With default ManagedLedgerImpl, the operation will fail appropriately.
+ */
+ @Test
+ public void testTrimConsumedLedgersBeforeApiCallable() throws Exception {
+ String topicName = "persistent://" + myNamespace + "/test-api-callable";
+
+ // Create a topic to ensure it exists
+ try (Producer producer = pulsarClient.newProducer()
+ .topic(topicName)
+ .create()) {
+ producer.send("test-message".getBytes());
+ }
+
+ // Test that the API methods are callable
+ // (with default implementation, they should fail appropriately)
+ try {
+ admin.topics().trimConsumedLedgersBefore(topicName, 12345L);
+ fail("Should have thrown PulsarAdminException because "
+ + "asyncTrimConsumedLedgersBefore is not supported by default implementation");
+ } catch (PulsarAdminException e) {
+ // Expected - the default implementation doesn't support this operation
+ assertNotNull(e.getMessage(), "Exception message should not be null");
+ }
+ }
+}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index cf6073cdc0444..f22bad3a86c02 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -2075,6 +2075,33 @@ CompletableFuture updateSubscriptionPropertiesAsync(String topic, String
*/
CompletableFuture trimTopicAsync(String topic);
+ /**
+ * Trim consumed ledgers before a specific ledger ID.
+ *
+ * This operation deletes all ledgers that are fully consumed before the specified ledger ID.
+ * The ledger ID can be any value - the system will adjust to use the appropriate boundary:
+ *
+ * - If the ledger ID exists, all ledgers before it (exclusive) will be deleted
+ * - If the ledger ID is greater than the last ledger, the current ledger is used as boundary
+ * - If the ledger ID falls in a gap, the next lower existing ledger is used as boundary
+ * - If the ledger ID is less than the first ledger, no action is taken
+ *
+ *
+ * @param topic The topic name
+ * @param ledgerId The ledger ID to trim before
+ * @throws PulsarAdminException if the operation fails
+ */
+ List trimConsumedLedgersBefore(String topic, long ledgerId) throws PulsarAdminException;
+
+ /**
+ * Trim consumed ledgers before a specific ledger ID asynchronously.
+ *
+ * @param topic The topic name
+ * @param ledgerId The ledger ID to trim before
+ * @return A CompletableFuture that completes with the list of deleted ledger IDs
+ */
+ CompletableFuture> trimConsumedLedgersBeforeAsync(String topic, long ledgerId);
+
/**
* Check the status of an ongoing compaction for a topic.
*
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 90612b148c970..df8e4c49c128d 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -1166,6 +1166,44 @@ public CompletableFuture trimTopicAsync(String topic) {
return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
}
+ @Override
+ public List trimConsumedLedgersBefore(String topic, long ledgerId) throws PulsarAdminException {
+ return sync(() -> trimConsumedLedgersBeforeAsync(topic, ledgerId));
+ }
+
+ @Override
+ public CompletableFuture> trimConsumedLedgersBeforeAsync(String topic, long ledgerId) {
+ TopicName tn = validateTopic(topic);
+ WebTarget path = topicPath(tn, "trimConsumedLedgersBefore", String.valueOf(ledgerId));
+ final CompletableFuture> future = new CompletableFuture<>();
+ try {
+ request(path).async().post(Entity.entity("", MediaType.APPLICATION_JSON),
+ new InvocationCallback() {
+ @Override
+ public void completed(Response response) {
+ int status = response.getStatus();
+ if (status != Response.Status.OK.getStatusCode()) {
+ future.completeExceptionally(getApiException(response));
+ } else {
+ try {
+ future.complete(response.readEntity(new GenericType>() {}));
+ } catch (Exception e) {
+ future.completeExceptionally(getApiException(e));
+ }
+ }
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ } catch (PulsarAdminException cae) {
+ future.completeExceptionally(cae);
+ }
+ return future;
+ }
+
@Override
public LongRunningProcessStatus compactionStatus(String topic)
throws PulsarAdminException {
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index aaa15a4822341..7bf1acfdbc1e8 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -265,6 +265,7 @@ public CmdTopics(Supplier admin) {
jcommander.addCommand("set-schema-validation-enforce", new SetSchemaValidationEnforced());
jcommander.addCommand("trim-topic", new TrimTopic());
+ jcommander.addCommand("trim-consumed-ledgers-before", new TrimConsumedLedgersBefore());
initDeprecatedCommands();
}
@@ -3156,4 +3157,33 @@ void run() throws PulsarAdminException {
getAdmin().topics().trimTopic(topic);
}
}
+
+ @Parameters(commandDescription = "Trim consumed ledgers before a specific ledger ID")
+ private class TrimConsumedLedgersBefore extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic ledgerId", required = true)
+ private java.util.List params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ if (params.size() < 2) {
+ throw new ParameterException("Topic name and ledger ID are required");
+ }
+ String topic = validateTopicName(params.subList(0, 1));
+ long ledgerId;
+ try {
+ ledgerId = Long.parseLong(params.get(1));
+ } catch (NumberFormatException e) {
+ throw new ParameterException("Invalid ledger ID: " + params.get(1));
+ }
+ List deletedLedgerIds = getAdmin().topics().trimConsumedLedgersBefore(topic, ledgerId);
+ if (deletedLedgerIds.isEmpty()) {
+ System.out.println("No ledgers were deleted");
+ } else {
+ System.out.println("Deleted " + deletedLedgerIds.size() + " ledger(s):");
+ for (Long id : deletedLedgerIds) {
+ System.out.println(" " + id);
+ }
+ }
+ }
+ }
}