Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ Set<Long> deletedOffloads() {
OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES,
OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_SECONDS,
OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS,
OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,13 @@ static class MockLedgerOffloader implements LedgerOffloader {
ConcurrentHashMap<UUID, ReadHandle> offloads = new ConcurrentHashMap<UUID, ReadHandle>();


OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create("S3", "", "", "",
OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl .create("S3", "", "", "",
null, null,
null, null,
OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES,
OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_SECONDS,
OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS,
OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1262,6 +1262,7 @@ Set<Long> deletedOffloads() {
OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES,
OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_SECONDS,
OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS,
OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1967,6 +1967,37 @@ protected void internalSetOffloadThreshold(long newThreshold) {
}
}

protected CompletableFuture<Void> internalSetOffloadThresholdInSecondsAsync(long newThreshold) {
CompletableFuture<Void> f = new CompletableFuture<>();

validateNamespacePolicyOperationAsync(namespaceName, PolicyName.OFFLOAD, PolicyOperation.WRITE)
.thenApply(v -> validatePoliciesReadOnlyAccessAsync())
.thenApply(v -> updatePoliciesAsync(namespaceName,
policies -> {
if (policies.offload_policies == null) {
policies.offload_policies = new OffloadPoliciesImpl();
}
((OffloadPoliciesImpl) policies.offload_policies)
.setManagedLedgerOffloadThresholdInSeconds(newThreshold);
policies.offload_threshold_in_seconds = newThreshold;
return policies;
})
)
.thenAccept(v -> {
log.info("[{}] Successfully updated offloadThresholdInSeconds configuration:"
+ " namespace={}, value={}", clientAppId(), namespaceName, newThreshold);
f.complete(null);
})
.exceptionally(t -> {
log.error("[{}] Failed to update offloadThresholdInSeconds configuration for namespace {}",
clientAppId(), namespaceName, t);
f.completeExceptionally(new RestException(t));
return null;
});

return f;
}

protected void internalSetOffloadDeletionLag(Long newDeletionLagMs) {
validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2090,6 +2090,58 @@ public void setOffloadThreshold(@PathParam("tenant") String tenant,
internalSetOffloadThreshold(newThreshold);
}

@GET
@Path("/{tenant}/{namespace}/offloadThresholdInSeconds")
@ApiOperation(value = "Maximum number of bytes stored on the pulsar cluster for a topic,"
+ " before the broker will start offloading to longterm storage",
notes = "A negative value disables automatic offloading")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace doesn't exist") })
public void getOffloadThresholdInSeconds(
@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
validateNamespacePolicyOperationAsync(namespaceName, PolicyName.OFFLOAD, PolicyOperation.READ)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenAccept(policies -> {
if (policies.offload_policies == null
|| policies.offload_policies.getManagedLedgerOffloadThresholdInSeconds() == null) {
asyncResponse.resume(policies.offload_threshold_in_seconds);
} else {
asyncResponse.resume(policies.offload_policies.getManagedLedgerOffloadThresholdInSeconds());
}
})
.exceptionally(ex -> {
log.error("[{}] Failed to get offload threshold on namespace {}", clientAppId(), namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@PUT
@Path("/{tenant}/{namespace}/offloadThresholdInSeconds")
@ApiOperation(value = "Set maximum number of seconds stored on the pulsar cluster for a topic,"
+ " before the broker will start offloading to longterm storage",
notes = "A negative value disables automatic offloading")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace doesn't exist"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 412, message = "offloadThresholdInSeconds value is not valid") })
public void setOffloadThresholdInSeconds(
@Suspended final AsyncResponse response,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
long newThreshold) {
validateNamespaceName(tenant, namespace);
internalSetOffloadThresholdInSecondsAsync(newThreshold)
.thenAccept(response::resume)
.exceptionally(t -> {
resumeAsyncResponseExceptionally(response, t);
return null;
});
}

@GET
@Path("/{tenant}/{namespace}/offloadDeletionLagMs")
@ApiOperation(value = "Number of milliseconds to wait before deleting a ledger segment which has been offloaded"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,13 @@ public void testOffloadPolicies() throws Exception {
String bucket = "test-bucket";
String endpoint = "test-endpoint";
long offloadThresholdInBytes = 0;
long offloadThresholdInSeconds = 100;
long offloadDeletionLagInMillis = 100L;
OffloadedReadPriority priority = OffloadedReadPriority.TIERED_STORAGE_FIRST;

OffloadPolicies offload1 = OffloadPoliciesImpl.create(
driver, region, bucket, endpoint, null, null, null, null,
100, 100, offloadThresholdInBytes, offloadDeletionLagInMillis, priority);
100, 100, offloadThresholdInBytes, offloadThresholdInSeconds, offloadDeletionLagInMillis, priority);
admin.namespaces().setOffloadPolicies(namespaceName, offload1);
OffloadPolicies offload2 = admin.namespaces().getOffloadPolicies(namespaceName);
assertEquals(offload1, offload2);
Expand Down Expand Up @@ -239,6 +240,7 @@ public void testOffloadPoliciesAppliedApi() throws Exception {

OffloadPoliciesImpl namespacePolicies = new OffloadPoliciesImpl();
namespacePolicies.setManagedLedgerOffloadThresholdInBytes(100L);
namespacePolicies.setManagedLedgerOffloadThresholdInSeconds(100L);
namespacePolicies.setManagedLedgerOffloadDeletionLagInMillis(200L);
namespacePolicies.setManagedLedgerOffloadDriver("s3");
namespacePolicies.setManagedLedgerOffloadBucket("buck");
Expand All @@ -250,6 +252,7 @@ public void testOffloadPoliciesAppliedApi() throws Exception {

OffloadPoliciesImpl topicPolicies = new OffloadPoliciesImpl();
topicPolicies.setManagedLedgerOffloadThresholdInBytes(200L);
topicPolicies.setManagedLedgerOffloadThresholdInSeconds(200L);
topicPolicies.setManagedLedgerOffloadDeletionLagInMillis(400L);
topicPolicies.setManagedLedgerOffloadDriver("s3");
topicPolicies.setManagedLedgerOffloadBucket("buck2");
Expand All @@ -267,6 +270,78 @@ public void testOffloadPoliciesAppliedApi() throws Exception {
-> assertEquals(admin.topics().getOffloadPolicies(topicName, true), brokerPolicies));
}


@Test
public void testSetNamespaceOffloadPolicies() throws Exception {
conf.setManagedLedgerOffloadThresholdInSeconds(100);
conf.setManagedLedgerOffloadAutoTriggerSizeThresholdBytes(100);

OffloadPoliciesImpl policies = new OffloadPoliciesImpl();
policies.setManagedLedgerOffloadThresholdInBytes(200L);
policies.setManagedLedgerOffloadThresholdInSeconds(200L);
policies.setManagedLedgerOffloadDeletionLagInMillis(400L);
policies.setManagedLedgerOffloadDriver("s3");
policies.setManagedLedgerOffloadBucket("buck2");

admin.namespaces().setOffloadThresholdInSeconds(myNamespace, 300);
assertEquals(admin.namespaces().getOffloadThresholdInSeconds(myNamespace), 300);

admin.namespaces().setOffloadPolicies(myNamespace, policies);
assertEquals(admin.namespaces().getOffloadPolicies(myNamespace), policies);
}

@Test
public void testSetTopicOffloadPolicies() throws Exception {
conf.setManagedLedgerOffloadThresholdInSeconds(100);
conf.setManagedLedgerOffloadAutoTriggerSizeThresholdBytes(100);

LedgerOffloader topicOffloader = mock(LedgerOffloader.class);
when(topicOffloader.getOffloadDriverName()).thenReturn("mock");
doReturn(topicOffloader).when(pulsar).createManagedLedgerOffloader(any());

OffloadPoliciesImpl namespacePolicies = new OffloadPoliciesImpl();
namespacePolicies.setManagedLedgerOffloadThresholdInBytes(200L);
namespacePolicies.setManagedLedgerOffloadThresholdInSeconds(200L);
namespacePolicies.setManagedLedgerOffloadDeletionLagInMillis(400L);
namespacePolicies.setManagedLedgerOffloadDriver("s3");
namespacePolicies.setManagedLedgerOffloadBucket("buck2");

admin.namespaces().setOffloadThresholdInSeconds(myNamespace, 300);
assertEquals(admin.namespaces().getOffloadThresholdInSeconds(myNamespace), 300);

admin.namespaces().setOffloadPolicies(myNamespace, namespacePolicies);
assertEquals(admin.namespaces().getOffloadPolicies(myNamespace), namespacePolicies);

OffloadPoliciesImpl topicPolicies = new OffloadPoliciesImpl();
topicPolicies.setManagedLedgerOffloadThresholdInBytes(500L);
topicPolicies.setManagedLedgerOffloadThresholdInSeconds(500L);
topicPolicies.setManagedLedgerOffloadDeletionLagInMillis(400L);
topicPolicies.setManagedLedgerOffloadDriver("s3");
topicPolicies.setManagedLedgerOffloadBucket("buck2");

String topicName = testTopic + UUID.randomUUID();
admin.topics().createNonPartitionedTopic(topicName);
admin.topicPolicies().setOffloadPolicies(topicName, topicPolicies);

// Wait until broker update policies finished.
for (int a = 1; a <= 5; a++) {
try {
OffloadPolicies policies = admin.topicPolicies().getOffloadPolicies(topicName);

assertEquals(policies.getManagedLedgerOffloadThresholdInSeconds(),
topicPolicies.getManagedLedgerOffloadThresholdInSeconds());
assertEquals(policies.getManagedLedgerOffloadThresholdInBytes(),
topicPolicies.getManagedLedgerOffloadThresholdInBytes());
} catch (Exception e) {
if (a == 5) {
throw e;
} else {
Thread.sleep(1000L);
}
}
}
}

@Test
public void testTopicLevelOffloadPartitioned() throws Exception {
testOffload(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1455,15 +1455,18 @@ public void testSetOffloadThreshold() throws Exception {
System.out.println(namespace);
// set a default
pulsar.getConfiguration().setManagedLedgerOffloadAutoTriggerSizeThresholdBytes(1);
pulsar.getConfiguration().setManagedLedgerOffloadThresholdInSeconds(100);
// create the namespace
admin.namespaces().createNamespace(namespace, Set.of(testLocalCluster));
admin.topics().createNonPartitionedTopic(topicName.toString());

admin.namespaces().setOffloadDeleteLag(namespace, 10000, TimeUnit.SECONDS);
assertEquals(-1, admin.namespaces().getOffloadThreshold(namespace));
assertEquals(admin.namespaces().getOffloadThreshold(namespace), -1);
assertEquals(admin.namespaces().getOffloadThresholdInSeconds(namespace), -1);

// assert we get the default which indicates it will fall back to default
assertEquals(-1, admin.namespaces().getOffloadThreshold(namespace));
assertEquals(admin.namespaces().getOffloadThreshold(namespace), -1);
assertEquals(admin.namespaces().getOffloadThresholdInSeconds(namespace), -1);
// the ledger config should have the expected value
ManagedLedgerConfig ledgerConf = pulsar.getBrokerService().getManagedLedgerConfig(topicName).get();
MockLedgerOffloader offloader = new MockLedgerOffloader(OffloadPoliciesImpl.create("S3", "", "", "",
Expand All @@ -1472,15 +1475,21 @@ public void testSetOffloadThreshold() throws Exception {
OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
admin.namespaces().getOffloadThreshold(namespace),
admin.namespaces().getOffloadThresholdInSeconds(namespace),
pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(),
OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY));
ledgerConf.setLedgerOffloader(offloader);
assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
Long.valueOf(-1));
assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInSeconds(),
Long.valueOf(-1));


// set an override for the namespace
admin.namespaces().setOffloadThreshold(namespace, 100);
assertEquals(100, admin.namespaces().getOffloadThreshold(namespace));
admin.namespaces().setOffloadThresholdInSeconds(namespace, 100);
assertEquals(admin.namespaces().getOffloadThreshold(namespace), 100);
assertEquals(admin.namespaces().getOffloadThresholdInSeconds(namespace), 100);
ledgerConf = pulsar.getBrokerService().getManagedLedgerConfig(topicName).get();
admin.namespaces().getOffloadPolicies(namespace);
offloader = new MockLedgerOffloader(OffloadPoliciesImpl.create("S3", "", "", "",
Expand All @@ -1489,14 +1498,18 @@ public void testSetOffloadThreshold() throws Exception {
OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
admin.namespaces().getOffloadThreshold(namespace),
admin.namespaces().getOffloadThresholdInSeconds(namespace),
pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(),
OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY));
ledgerConf.setLedgerOffloader(offloader);
assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
Long.valueOf(100));
assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInSeconds(),
Long.valueOf(100));

// set another negative value to disable
admin.namespaces().setOffloadThreshold(namespace, -2);
admin.namespaces().setOffloadThresholdInSeconds(namespace, -2);
assertEquals(-2, admin.namespaces().getOffloadThreshold(namespace));
ledgerConf = pulsar.getBrokerService().getManagedLedgerConfig(topicName).get();
offloader = new MockLedgerOffloader(OffloadPoliciesImpl.create("S3", "", "", "",
Expand All @@ -1505,14 +1518,18 @@ public void testSetOffloadThreshold() throws Exception {
OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
admin.namespaces().getOffloadThreshold(namespace),
admin.namespaces().getOffloadThresholdInSeconds(namespace),
pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(),
OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY));
ledgerConf.setLedgerOffloader(offloader);
assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
Long.valueOf(-2));
assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInSeconds(),
Long.valueOf(-2));

// set back to -1 and fall back to default
admin.namespaces().setOffloadThreshold(namespace, -1);
admin.namespaces().setOffloadThresholdInSeconds(namespace, -1);
assertEquals(-1, admin.namespaces().getOffloadThreshold(namespace));
ledgerConf = pulsar.getBrokerService().getManagedLedgerConfig(topicName).get();
offloader = new MockLedgerOffloader(OffloadPoliciesImpl.create("S3", "", "", "",
Expand All @@ -1521,11 +1538,14 @@ public void testSetOffloadThreshold() throws Exception {
OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
admin.namespaces().getOffloadThreshold(namespace),
admin.namespaces().getOffloadThresholdInSeconds(namespace),
pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(),
OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY));
ledgerConf.setLedgerOffloader(offloader);
assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
Long.valueOf(-1));
assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInSeconds(),
Long.valueOf(-1));

// cleanup
admin.topics().delete(topicName.toString(), true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ public void testReplicatorOffloadPolicies() throws Exception {
init(namespace, persistentTopicName);
OffloadPoliciesImpl offloadPolicies =
OffloadPoliciesImpl.create("s3", "region", "bucket", "endpoint", null, null, null, null,
8, 9, 10L, null, OffloadedReadPriority.BOOKKEEPER_FIRST);
8, 9, 10L, 10L, null, OffloadedReadPriority.BOOKKEEPER_FIRST);
// local
try {
admin1.topicPolicies().setOffloadPolicies(persistentTopicName, offloadPolicies);
Expand Down
Loading