From 286ad6c7a6dd3b23ba844398596986a926bb22b4 Mon Sep 17 00:00:00 2001 From: daojun Date: Wed, 26 Oct 2022 20:17:49 +0800 Subject: [PATCH 01/11] review fix --- .../mledger/impl/OffloadLedgerDeleteTest.java | 1 + .../mledger/impl/OffloadPrefixReadTest.java | 3 +- .../mledger/impl/OffloadPrefixTest.java | 1 + .../broker/admin/impl/NamespacesBase.java | 26 ++++++ .../pulsar/broker/admin/v1/Namespaces.java | 47 +++++++++++ .../pulsar/broker/admin/NamespacesTest.java | 20 +++++ .../pulsar/client/admin/Namespaces.java | 83 +++++++++++++++++++ .../pulsar/common/policies/data/Policies.java | 5 +- .../client/admin/internal/NamespacesImpl.java | 22 +++++ .../pulsar/admin/cli/PulsarAdminToolTest.java | 14 ++-- .../pulsar/admin/cli/CmdNamespaces.java | 18 +++- .../policies/data/OffloadPoliciesImpl.java | 2 + .../policies/data/OffloadPoliciesTest.java | 6 ++ 13 files changed, 238 insertions(+), 10 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java index c54cee0765384..56da315553ea4 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java @@ -82,6 +82,7 @@ Set deletedOffloads() { OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES, OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES, OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES, + OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_SECONDS, OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS, OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java index 718d49f986110..cd224e33e2734 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java @@ -202,12 +202,13 @@ static class MockLedgerOffloader implements LedgerOffloader { ConcurrentHashMap offloads = new ConcurrentHashMap(); - OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create("S3", "", "", "", + OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl .create("S3", "", "", "", null, null, null, null, OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES, OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES, OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES, + OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_SECONDS, OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS, OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java index 9edcbeb64814b..2cdb14fb71e41 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java @@ -1262,6 +1262,7 @@ Set deletedOffloads() { OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES, OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES, OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES, + OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_SECONDS, OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS, OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 59e1226e16c71..3dacc5471c144 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -1965,6 +1965,32 @@ protected void internalSetOffloadThreshold(long newThreshold) { } } + protected void internalSetOffloadThresholdInSeconds(long newThreshold) { + validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.WRITE); + validatePoliciesReadOnlyAccess(); + + try { + updatePolicies(namespaceName, policies -> { + if (policies.offload_policies == null) { + policies.offload_policies = new OffloadPoliciesImpl(); + } + ((OffloadPoliciesImpl) policies.offload_policies) + .setManagedLedgerOffloadThresholdInSeconds(newThreshold); + policies.offload_threshold_in_seconds = newThreshold; + return policies; + }); + log.info("[{}] Successfully updated offloadThresholdInSeconds configuration: namespace={}, value={}", + clientAppId(), namespaceName, newThreshold); + + } catch (RestException pfe) { + throw pfe; + } catch (Exception e) { + log.error("[{}] Failed to update offloadThresholdInSeconds configuration for namespace {}", + clientAppId(), namespaceName, e); + throw new RestException(e); + } + } + protected void internalSetOffloadDeletionLag(Long newDeletionLagMs) { validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.WRITE); validatePoliciesReadOnlyAccess(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index 8f81c24502904..21639ef463442 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -1673,6 +1673,53 @@ public void setOffloadThreshold(@PathParam("property") String property, internalSetOffloadThreshold(newThreshold); } + + @GET + @Path("/{property}/{cluster}/{namespace}/offloadThresholdInSeconds") + @ApiOperation(value = "Maximum number of bytes stored on the pulsar cluster for a topic," + + " before the broker will start offloading to longterm storage", + notes = "A negative value disables automatic offloading") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Namespace doesn't exist") }) + public void getOffloadThresholdInSeconds( + @Suspended final AsyncResponse asyncResponse, + @PathParam("property") String property, + @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace) { + validateNamespaceName(property, cluster, namespace); + validateNamespacePolicyOperationAsync(namespaceName, PolicyName.OFFLOAD, PolicyOperation.READ) + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenAccept(policies -> { + if (policies.offload_policies == null) { + asyncResponse.resume(policies.offload_threshold_in_seconds); + } else { + asyncResponse.resume(policies.offload_policies.getManagedLedgerOffloadThresholdInSeconds()); + } + }) + .exceptionally(ex -> { + log.error("[{}] Failed to get offload threshold on namespace {}", clientAppId(), namespaceName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + + @PUT + @Path("/{property}/{cluster}/{namespace}/offloadThresholdInSeconds") + @ApiOperation(value = "Set maximum number of bytes stored on the pulsar cluster for a topic," + + " before the broker will start offloading to longterm storage", + notes = "A negative value disables automatic offloading") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Namespace doesn't exist"), + @ApiResponse(code = 409, message = "Concurrent modification"), + @ApiResponse(code = 412, message = "offloadThreshold value is not valid") }) + public void setOffloadThresholdInSeconds(@PathParam("property") String property, + @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace, + long newThreshold) { + validateNamespaceName(property, cluster, namespace); + internalSetOffloadThresholdInSeconds(newThreshold); + } + @GET @Path("/{tenant}/{cluster}/{namespace}/schemaAutoUpdateCompatibilityStrategy") @ApiOperation(value = "The strategy used to check the compatibility of new schemas," diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index 51c68ea0e89df..e65597c1eff46 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -1398,15 +1398,18 @@ public void testSetOffloadThreshold() throws Exception { System.out.println(namespace); // set a default pulsar.getConfiguration().setManagedLedgerOffloadAutoTriggerSizeThresholdBytes(1); + pulsar.getConfiguration().setManagedLedgerOffloadThresholdInSeconds(100); // create the namespace admin.namespaces().createNamespace(namespace, Set.of(testLocalCluster)); admin.topics().createNonPartitionedTopic(topicName.toString()); admin.namespaces().setOffloadDeleteLag(namespace, 10000, TimeUnit.SECONDS); assertEquals(-1, admin.namespaces().getOffloadThreshold(namespace)); + assertEquals(-1, admin.namespaces().getOffloadThresholdInSeconds(namespace)); // assert we get the default which indicates it will fall back to default assertEquals(-1, admin.namespaces().getOffloadThreshold(namespace)); + assertEquals(-1, admin.namespaces().getOffloadThresholdInSeconds(namespace)); // the ledger config should have the expected value ManagedLedgerConfig ledgerConf = pulsar.getBrokerService().getManagedLedgerConfig(topicName).get(); MockLedgerOffloader offloader = new MockLedgerOffloader(OffloadPoliciesImpl.create("S3", "", "", "", @@ -1415,15 +1418,21 @@ public void testSetOffloadThreshold() throws Exception { OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES, OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES, admin.namespaces().getOffloadThreshold(namespace), + admin.namespaces().getOffloadThresholdInSeconds(namespace), pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(), OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY)); ledgerConf.setLedgerOffloader(offloader); assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(), Long.valueOf(-1)); + assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInSeconds(), + Long.valueOf(-1)); + // set an override for the namespace admin.namespaces().setOffloadThreshold(namespace, 100); + admin.namespaces().setOffloadThresholdInSeconds(namespace, 100); assertEquals(100, admin.namespaces().getOffloadThreshold(namespace)); + assertEquals(100, admin.namespaces().getOffloadThresholdInSeconds(namespace)); ledgerConf = pulsar.getBrokerService().getManagedLedgerConfig(topicName).get(); admin.namespaces().getOffloadPolicies(namespace); offloader = new MockLedgerOffloader(OffloadPoliciesImpl.create("S3", "", "", "", @@ -1432,14 +1441,18 @@ public void testSetOffloadThreshold() throws Exception { OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES, OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES, admin.namespaces().getOffloadThreshold(namespace), + admin.namespaces().getOffloadThresholdInSeconds(namespace), pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(), OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY)); ledgerConf.setLedgerOffloader(offloader); assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(), Long.valueOf(100)); + assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInSeconds(), + Long.valueOf(100)); // set another negative value to disable admin.namespaces().setOffloadThreshold(namespace, -2); + admin.namespaces().setOffloadThresholdInSeconds(namespace, -2); assertEquals(-2, admin.namespaces().getOffloadThreshold(namespace)); ledgerConf = pulsar.getBrokerService().getManagedLedgerConfig(topicName).get(); offloader = new MockLedgerOffloader(OffloadPoliciesImpl.create("S3", "", "", "", @@ -1448,14 +1461,18 @@ public void testSetOffloadThreshold() throws Exception { OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES, OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES, admin.namespaces().getOffloadThreshold(namespace), + admin.namespaces().getOffloadThresholdInSeconds(namespace), pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(), OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY)); ledgerConf.setLedgerOffloader(offloader); assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(), Long.valueOf(-2)); + assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInSeconds(), + Long.valueOf(-2)); // set back to -1 and fall back to default admin.namespaces().setOffloadThreshold(namespace, -1); + admin.namespaces().setOffloadThresholdInSeconds(namespace, -1); assertEquals(-1, admin.namespaces().getOffloadThreshold(namespace)); ledgerConf = pulsar.getBrokerService().getManagedLedgerConfig(topicName).get(); offloader = new MockLedgerOffloader(OffloadPoliciesImpl.create("S3", "", "", "", @@ -1464,11 +1481,14 @@ public void testSetOffloadThreshold() throws Exception { OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES, OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES, admin.namespaces().getOffloadThreshold(namespace), + admin.namespaces().getOffloadThresholdInSeconds(namespace), pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(), OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY)); ledgerConf.setLedgerOffloader(offloader); assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(), Long.valueOf(-1)); + assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInSeconds(), + Long.valueOf(-1)); // cleanup admin.topics().delete(topicName.toString(), true); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java index 16d4e155635d8..f4c284bb48434 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java @@ -3532,6 +3532,45 @@ CompletableFuture removeMaxUnackedMessagesPerSubscriptionAsync( */ CompletableFuture getOffloadThresholdAsync(String namespace); + + /** + * Get the offloadThresholdInSeconds for a namespace. + * + *

+ * Response example: + * + *

+     * 10000000
+     * 
+ * + * @param namespace + * Namespace name + * + * @throws NotAuthorizedException + * Don't have admin permission + * @throws NotFoundException + * Namespace does not exist + * @throws PulsarAdminException + * Unexpected error + */ + long getOffloadThresholdInSeconds(String namespace) throws PulsarAdminException; + + + /** + * Get the offloadThresholdInSeconds for a namespace. + * + *

+ * Response example: + * + *

+     * 10000000
+     * 
+ * + * @param namespace + * Namespace name + */ + CompletableFuture getOffloadThresholdInSecondsAsync(String namespace); + /** * Set the offloadThreshold for a namespace. *

@@ -3581,6 +3620,50 @@ CompletableFuture removeMaxUnackedMessagesPerSubscriptionAsync( */ CompletableFuture setOffloadThresholdAsync(String namespace, long offloadThreshold); + + /** + * Set the offloadThresholdInSeconds for a namespace. + *

+ * Negative values disabled automatic offloading. Setting a threshold of 0 will offload data as soon as possible. + *

+ * Request example: + * + *

+     * 10000000
+     * 
+ * + * @param namespace + * Namespace name + * @param offloadThresholdInSeconds + * maximum number of bytes stored before offloading is triggered + * + * @throws NotAuthorizedException + * Don't have admin permission + * @throws NotFoundException + * Namespace does not exist + * @throws PulsarAdminException + * Unexpected error + */ + void setOffloadThresholdInSeconds(String namespace, long offloadThresholdInSeconds) throws PulsarAdminException; + + /** + * Set the offloadThresholdInSeconds for a namespace. + *

+ * Negative values disabled automatic offloading. Setting a threshold of 0 will offload data as soon as possible. + *

+ * Request example: + * + *

+     * 10000000
+     * 
+ * + * @param namespace + * Namespace name + * @param offloadThresholdInSeconds + * maximum number of seconds stored before offloading is triggered + */ + CompletableFuture setOffloadThresholdInSecondsAsync(String namespace, long offloadThresholdInSeconds); + /** * Get the offload deletion lag for a namespace, in milliseconds. * The number of milliseconds to wait before deleting a ledger segment which has been offloaded from diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java index ae469e9c5966f..066fdf1df4f09 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java @@ -94,6 +94,8 @@ public class Policies { @SuppressWarnings("checkstyle:MemberName") public long offload_threshold = -1; @SuppressWarnings("checkstyle:MemberName") + public long offload_threshold_in_seconds = -1; + @SuppressWarnings("checkstyle:MemberName") public Long offload_deletion_lag_ms = null; @SuppressWarnings("checkstyle:MemberName") public Integer max_topics_per_namespace = null; @@ -145,7 +147,7 @@ public int hashCode() { max_producers_per_topic, max_consumers_per_topic, max_consumers_per_subscription, max_unacked_messages_per_consumer, max_unacked_messages_per_subscription, - compaction_threshold, offload_threshold, + compaction_threshold, offload_threshold, offload_threshold_in_seconds, offload_deletion_lag_ms, schema_auto_update_compatibility_strategy, schema_validation_enforced, @@ -191,6 +193,7 @@ public boolean equals(Object obj) { && Objects.equals(max_consumers_per_subscription, other.max_consumers_per_subscription) && Objects.equals(compaction_threshold, other.compaction_threshold) && offload_threshold == other.offload_threshold + && offload_threshold_in_seconds == other.offload_threshold_in_seconds && Objects.equals(offload_deletion_lag_ms, other.offload_deletion_lag_ms) && schema_auto_update_compatibility_strategy == other.schema_auto_update_compatibility_strategy && schema_validation_enforced == other.schema_validation_enforced diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index 3d193c827732e..25eded8a20b9c 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -1518,6 +1518,16 @@ public CompletableFuture getOffloadThresholdAsync(String namespace) { return asyncGetNamespaceParts(new FutureCallback(){}, namespace, "offloadThreshold"); } + @Override + public long getOffloadThresholdInSeconds(String namespace) throws PulsarAdminException { + return sync(() -> getOffloadThresholdInSecondsAsync(namespace)); + } + + @Override + public CompletableFuture getOffloadThresholdInSecondsAsync(String namespace) { + return asyncGetNamespaceParts(new FutureCallback(){}, namespace, "offloadThresholdInSeconds"); + } + @Override public void setOffloadThreshold(String namespace, long offloadThreshold) throws PulsarAdminException { sync(() -> setOffloadThresholdAsync(namespace, offloadThreshold)); @@ -1530,6 +1540,18 @@ public CompletableFuture setOffloadThresholdAsync(String namespace, long o return asyncPutRequest(path, Entity.entity(offloadThreshold, MediaType.APPLICATION_JSON)); } + @Override + public void setOffloadThresholdInSeconds(String namespace, long offloadThresholdInSeconds) throws PulsarAdminException { + sync(() -> setOffloadThresholdInSecondsAsync(namespace, offloadThresholdInSeconds)); + } + + @Override + public CompletableFuture setOffloadThresholdInSecondsAsync(String namespace, long offloadThresholdInSeconds) { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, "offloadThresholdInSeconds"); + return asyncPutRequest(path, Entity.entity(offloadThresholdInSeconds, MediaType.APPLICATION_JSON)); + } + @Override public Long getOffloadDeleteLagMs(String namespace) throws PulsarAdminException { return sync(() -> getOffloadDeleteLagMsAsync(namespace)); diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 51ef3d1524df3..58c974ad2923a 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -804,11 +804,11 @@ public void namespaces() throws Exception { verify(mockNamespaces).clearOffloadDeleteLag("myprop/clust/ns1"); namespaces.run(split( - "set-offload-policies myprop/clust/ns1 -r test-region -d aws-s3 -b test-bucket -e http://test.endpoint -mbs 32M -rbs 5M -oat 10M -oae 10s -orp tiered-storage-first")); + "set-offload-policies myprop/clust/ns1 -r test-region -d aws-s3 -b test-bucket -e http://test.endpoint -mbs 32M -rbs 5M -oat 10M -aots 100 -oae 10s -orp tiered-storage-first")); verify(mockNamespaces).setOffloadPolicies("myprop/clust/ns1", OffloadPoliciesImpl.create("aws-s3", "test-region", "test-bucket", "http://test.endpoint",null, null, null, null, 32 * 1024 * 1024, 5 * 1024 * 1024, - 10 * 1024 * 1024L, 10000L, OffloadedReadPriority.TIERED_STORAGE_FIRST)); + 10 * 1024 * 1024L, 100L, 10000L, OffloadedReadPriority.TIERED_STORAGE_FIRST)); namespaces.run(split("remove-offload-policies myprop/clust/ns1")); verify(mockNamespaces).removeOffloadPolicies("myprop/clust/ns1"); @@ -968,7 +968,7 @@ public void topicPolicies() throws Exception { verify(mockTopicsPolicies) .setOffloadPolicies("persistent://myprop/clust/ns1/ds1", OffloadPoliciesImpl.create("s3", "region", "bucket" , "endpoint", null, null, null, null, - 8, 9, 10L, null, OffloadedReadPriority.TIERED_STORAGE_FIRST)); + 8, 9, 10L, null, null, OffloadedReadPriority.TIERED_STORAGE_FIRST)); cmdTopics.run(split("get-retention persistent://myprop/clust/ns1/ds1")); verify(mockTopicsPolicies).getRetention("persistent://myprop/clust/ns1/ds1", false); @@ -1414,11 +1414,11 @@ public void topicPolicies() throws Exception { verify(mockGlobalTopicsPolicies).removeOffloadPolicies("persistent://myprop/clust/ns1/ds1"); cmdTopics.run(split("set-offload-policies persistent://myprop/clust/ns1/ds1 -d s3 -r" + - " region -b bucket -e endpoint -m 8 -rb 9 -t 10 -orp tiered-storage-first -g")); + " region -b bucket -e endpoint -m 8 -rb 9 -t 10 -oats 100 -orp tiered-storage-first -g")); verify(mockGlobalTopicsPolicies) .setOffloadPolicies("persistent://myprop/clust/ns1/ds1", OffloadPoliciesImpl.create("s3", "region", "bucket" , "endpoint", null, null, null, null, - 8, 9, 10L, null, OffloadedReadPriority.TIERED_STORAGE_FIRST)); + 8, 9, 10L, 100L, null, OffloadedReadPriority.TIERED_STORAGE_FIRST)); } @Test @@ -1701,10 +1701,10 @@ public void topics() throws Exception { cmdTopics.run(split("remove-delayed-delivery persistent://myprop/clust/ns1/ds1")); verify(mockTopics).removeDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1") ; - cmdTopics.run(split("set-offload-policies persistent://myprop/clust/ns1/ds1 -d s3 -r region -b bucket -e endpoint -m 8 -rb 9 -t 10 -orp tiered-storage-first")); + cmdTopics.run(split("set-offload-policies persistent://myprop/clust/ns1/ds1 -d s3 -r region -b bucket -e endpoint -oats 50 -m 8 -rb 9 -t 10 -orp tiered-storage-first")); OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create("s3", "region", "bucket" , "endpoint", null, null, null, null, - 8, 9, 10L, null, OffloadedReadPriority.TIERED_STORAGE_FIRST); + 8, 9, 10L, 50L, null, OffloadedReadPriority.TIERED_STORAGE_FIRST); verify(mockTopics).setOffloadPolicies("persistent://myprop/clust/ns1/ds1", offloadPolicies); cmdTopics.run(split("get-max-unacked-messages-on-consumer persistent://myprop/clust/ns1/ds1")); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java index aba0a6cda547c..b64df272b4468 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java @@ -2273,6 +2273,13 @@ private class SetOffloadPolicies extends CliCommand { required = false) private String offloadAfterThresholdStr; + @Parameter( + names = {"--offloadAfterThresholdInSeconds", "-oats"}, + description = "Offload after threshold seconds (eg: 1,5,10)", + required = false + ) + private String offloadAfterThresholdInSecondsStr; + @Parameter( names = {"--offloadedReadPriority", "-orp"}, description = "Read priority for offloaded messages. By default, once messages are offloaded to " @@ -2366,6 +2373,15 @@ && maxValueCheck("OffloadAfterElapsed", offloadAfterElapsed, Long.MAX_VALUE)) { offloadAfterThresholdInBytes = offloadAfterThreshold; } } + + Long offloadThresholdInSeconds = OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_SECONDS; + if (StringUtils.isNotEmpty(offloadAfterThresholdInSecondsStr)) { + long offloadThresholdInSeconds0 = Long.parseLong(offloadAfterThresholdInSecondsStr.trim()); + if (maxValueCheck("OffloadAfterThresholdInSeconds", offloadThresholdInSeconds0, Long.MAX_VALUE)) { + offloadThresholdInSeconds = offloadThresholdInSeconds0; + } + } + OffloadedReadPriority offloadedReadPriority = OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY; if (this.offloadReadPriorityStr != null) { @@ -2384,7 +2400,7 @@ && maxValueCheck("OffloadAfterElapsed", offloadAfterElapsed, Long.MAX_VALUE)) { s3Role, s3RoleSessionName, awsId, awsSecret, maxBlockSizeInBytes, readBufferSizeInBytes, offloadAfterThresholdInBytes, - offloadAfterElapsedInMillis, offloadedReadPriority); + offloadThresholdInSeconds, offloadAfterElapsedInMillis, offloadedReadPriority); getAdmin().namespaces().setOffloadPolicies(namespace, offloadPolicies); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java index 9b4e8506c89f6..7a7854b62e46f 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java @@ -183,11 +183,13 @@ public static OffloadPoliciesImpl create(String driver, String region, String bu String credentialId, String credentialSecret, Integer maxBlockSizeInBytes, Integer readBufferSizeInBytes, Long offloadThresholdInBytes, + Long offloadThresholdInSeconds, Long offloadDeletionLagInMillis, OffloadedReadPriority readPriority) { OffloadPoliciesImplBuilder builder = builder() .managedLedgerOffloadDriver(driver) .managedLedgerOffloadThresholdInBytes(offloadThresholdInBytes) + .managedLedgerOffloadThresholdInSeconds(offloadThresholdInSeconds) .managedLedgerOffloadDeletionLagInMillis(offloadDeletionLagInMillis) .managedLedgerOffloadBucket(bucket) .managedLedgerOffloadRegion(region) diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java index f6496af2cfe72..38e85d8047981 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java @@ -50,6 +50,7 @@ public void testS3Configuration() { final Integer maxBlockSizeInBytes = 5 * M; final Integer readBufferSizeInBytes = 2 * M; final Long offloadThresholdInBytes = 10L * M; + final Long offloadThresholdInSeconds = 1000L; final Long offloadDeletionLagInMillis = 5L * MIN; OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create( @@ -64,6 +65,7 @@ public void testS3Configuration() { maxBlockSizeInBytes, readBufferSizeInBytes, offloadThresholdInBytes, + offloadThresholdInSeconds, offloadDeletionLagInMillis, OffloadedReadPriority.TIERED_STORAGE_FIRST ); @@ -78,6 +80,7 @@ public void testS3Configuration() { offloadThresholdInBytes); Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis(), Long.valueOf(offloadDeletionLagInMillis)); + Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadThresholdInSeconds(), offloadThresholdInSeconds); } @Test @@ -93,6 +96,7 @@ public void testGcsConfiguration() { final Integer maxBlockSizeInBytes = 5 * M; final Integer readBufferSizeInBytes = 2 * M; final Long offloadThresholdInBytes = 0L; + final Long offloadThresholdInSeconds = 1000L; final Long offloadDeletionLagInMillis = 5 * MIN; final OffloadedReadPriority readPriority = OffloadedReadPriority.TIERED_STORAGE_FIRST; @@ -108,6 +112,7 @@ public void testGcsConfiguration() { maxBlockSizeInBytes, readBufferSizeInBytes, offloadThresholdInBytes, + offloadThresholdInSeconds, offloadDeletionLagInMillis, readPriority ); @@ -120,6 +125,7 @@ public void testGcsConfiguration() { Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadThresholdInBytes(), offloadThresholdInBytes); Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis(), offloadDeletionLagInMillis); Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadedReadPriority(), readPriority); + Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadThresholdInSeconds(), offloadThresholdInSeconds); } @Test From 929f53bafed31ec4095a085cbc781ce720c6d7ea Mon Sep 17 00:00:00 2001 From: daojun Date: Thu, 27 Oct 2022 16:00:01 +0800 Subject: [PATCH 02/11] fix --- .../pulsar/broker/admin/v1/Namespaces.java | 47 -------- .../pulsar/broker/admin/v2/Namespaces.java | 44 ++++++++ .../broker/admin/AdminApiOffloadTest.java | 101 +++++++++++++++++- .../service/ReplicatorTopicPoliciesTest.java | 2 +- .../client/admin/internal/NamespacesImpl.java | 3 +- .../pulsar/admin/cli/CmdTopicPolicies.java | 7 +- .../apache/pulsar/admin/cli/CmdTopics.java | 7 +- .../sql/presto/TestPulsarSplitManager.java | 1 + 8 files changed, 160 insertions(+), 52 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index 21639ef463442..8f81c24502904 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -1673,53 +1673,6 @@ public void setOffloadThreshold(@PathParam("property") String property, internalSetOffloadThreshold(newThreshold); } - - @GET - @Path("/{property}/{cluster}/{namespace}/offloadThresholdInSeconds") - @ApiOperation(value = "Maximum number of bytes stored on the pulsar cluster for a topic," - + " before the broker will start offloading to longterm storage", - notes = "A negative value disables automatic offloading") - @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), - @ApiResponse(code = 404, message = "Namespace doesn't exist") }) - public void getOffloadThresholdInSeconds( - @Suspended final AsyncResponse asyncResponse, - @PathParam("property") String property, - @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace) { - validateNamespaceName(property, cluster, namespace); - validateNamespacePolicyOperationAsync(namespaceName, PolicyName.OFFLOAD, PolicyOperation.READ) - .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) - .thenAccept(policies -> { - if (policies.offload_policies == null) { - asyncResponse.resume(policies.offload_threshold_in_seconds); - } else { - asyncResponse.resume(policies.offload_policies.getManagedLedgerOffloadThresholdInSeconds()); - } - }) - .exceptionally(ex -> { - log.error("[{}] Failed to get offload threshold on namespace {}", clientAppId(), namespaceName, ex); - resumeAsyncResponseExceptionally(asyncResponse, ex); - return null; - }); - } - - @PUT - @Path("/{property}/{cluster}/{namespace}/offloadThresholdInSeconds") - @ApiOperation(value = "Set maximum number of bytes stored on the pulsar cluster for a topic," - + " before the broker will start offloading to longterm storage", - notes = "A negative value disables automatic offloading") - @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), - @ApiResponse(code = 404, message = "Namespace doesn't exist"), - @ApiResponse(code = 409, message = "Concurrent modification"), - @ApiResponse(code = 412, message = "offloadThreshold value is not valid") }) - public void setOffloadThresholdInSeconds(@PathParam("property") String property, - @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace, - long newThreshold) { - validateNamespaceName(property, cluster, namespace); - internalSetOffloadThresholdInSeconds(newThreshold); - } - @GET @Path("/{tenant}/{cluster}/{namespace}/schemaAutoUpdateCompatibilityStrategy") @ApiOperation(value = "The strategy used to check the compatibility of new schemas," diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 63120e47b8d84..fb8d65c41ac75 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -2088,6 +2088,50 @@ public void setOffloadThreshold(@PathParam("tenant") String tenant, internalSetOffloadThreshold(newThreshold); } + @GET + @Path("/{tenant}/{namespace}/offloadThresholdInSeconds") + @ApiOperation(value = "Maximum number of bytes stored on the pulsar cluster for a topic," + + " before the broker will start offloading to longterm storage", + notes = "A negative value disables automatic offloading") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Namespace doesn't exist") }) + public void getOffloadThresholdInSeconds( + @Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace) { + validateNamespaceName(tenant, namespace); + validateNamespacePolicyOperationAsync(namespaceName, PolicyName.OFFLOAD, PolicyOperation.READ) + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenAccept(policies -> { + if (policies.offload_policies == null) { + asyncResponse.resume(policies.offload_threshold_in_seconds); + } else { + asyncResponse.resume(policies.offload_policies.getManagedLedgerOffloadThresholdInSeconds()); + } + }) + .exceptionally(ex -> { + log.error("[{}] Failed to get offload threshold on namespace {}", clientAppId(), namespaceName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + + @PUT + @Path("/{tenant}/{namespace}/offloadThresholdInSeconds") + @ApiOperation(value = "Set maximum number of bytes stored on the pulsar cluster for a topic," + + " before the broker will start offloading to longterm storage", + notes = "A negative value disables automatic offloading") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Namespace doesn't exist"), + @ApiResponse(code = 409, message = "Concurrent modification"), + @ApiResponse(code = 412, message = "offloadThreshold value is not valid") }) + public void setOffloadThresholdInSeconds(@PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + long newThreshold) { + validateNamespaceName(tenant, namespace); + internalSetOffloadThresholdInSeconds(newThreshold); + } + @GET @Path("/{tenant}/{namespace}/offloadDeletionLagMs") @ApiOperation(value = "Number of milliseconds to wait before deleting a ledger segment which has been offloaded" diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java index 862ce926581b3..1746a0e95daef 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java @@ -52,9 +52,14 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + import org.apache.bookkeeper.mledger.LedgerOffloader; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerInfo; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.LongRunningProcessStatus; import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException; @@ -186,12 +191,13 @@ public void testOffloadPolicies() throws Exception { String bucket = "test-bucket"; String endpoint = "test-endpoint"; long offloadThresholdInBytes = 0; + long offloadThresholdInSeconds = 100; long offloadDeletionLagInMillis = 100L; OffloadedReadPriority priority = OffloadedReadPriority.TIERED_STORAGE_FIRST; OffloadPolicies offload1 = OffloadPoliciesImpl.create( driver, region, bucket, endpoint, null, null, null, null, - 100, 100, offloadThresholdInBytes, offloadDeletionLagInMillis, priority); + 100, 100, offloadThresholdInBytes, offloadThresholdInSeconds, offloadDeletionLagInMillis, priority); admin.namespaces().setOffloadPolicies(namespaceName, offload1); OffloadPolicies offload2 = admin.namespaces().getOffloadPolicies(namespaceName); assertEquals(offload1, offload2); @@ -239,6 +245,7 @@ public void testOffloadPoliciesAppliedApi() throws Exception { OffloadPoliciesImpl namespacePolicies = new OffloadPoliciesImpl(); namespacePolicies.setManagedLedgerOffloadThresholdInBytes(100L); + namespacePolicies.setManagedLedgerOffloadThresholdInSeconds(100L); namespacePolicies.setManagedLedgerOffloadDeletionLagInMillis(200L); namespacePolicies.setManagedLedgerOffloadDriver("s3"); namespacePolicies.setManagedLedgerOffloadBucket("buck"); @@ -250,6 +257,7 @@ public void testOffloadPoliciesAppliedApi() throws Exception { OffloadPoliciesImpl topicPolicies = new OffloadPoliciesImpl(); topicPolicies.setManagedLedgerOffloadThresholdInBytes(200L); + topicPolicies.setManagedLedgerOffloadThresholdInSeconds(200L); topicPolicies.setManagedLedgerOffloadDeletionLagInMillis(400L); topicPolicies.setManagedLedgerOffloadDriver("s3"); topicPolicies.setManagedLedgerOffloadBucket("buck2"); @@ -267,6 +275,97 @@ public void testOffloadPoliciesAppliedApi() throws Exception { -> assertEquals(admin.topics().getOffloadPolicies(topicName, true), brokerPolicies)); } + + @Test + public void testSetNamespaceOffloadPolicies() throws Exception { + conf.setManagedLedgerOffloadThresholdInSeconds(100); + conf.setManagedLedgerOffloadAutoTriggerSizeThresholdBytes(100); + + OffloadPoliciesImpl policies = new OffloadPoliciesImpl(); + policies.setManagedLedgerOffloadThresholdInBytes(200L); + policies.setManagedLedgerOffloadThresholdInSeconds(200L); + policies.setManagedLedgerOffloadDeletionLagInMillis(400L); + policies.setManagedLedgerOffloadDriver("s3"); + policies.setManagedLedgerOffloadBucket("buck2"); + + admin.namespaces().setOffloadThresholdInSeconds(myNamespace, 300); + assertEquals(300, admin.namespaces().getOffloadThresholdInSeconds(myNamespace)); + + admin.namespaces().setOffloadPolicies(myNamespace, policies); + assertEquals(policies, admin.namespaces().getOffloadPolicies(myNamespace)); + + String topicName = testTopic + UUID.randomUUID(); + try { + Topic topic = pulsar.getBrokerService().getOrCreateTopic(topicName).get(10, TimeUnit.SECONDS); + assertNotNull(topic); + + assertTrue(topic instanceof PersistentTopic); + + PersistentTopic persistentTopic = (PersistentTopic) topic; + ManagedLedger ledger = persistentTopic.getManagedLedger(); + ManagedLedgerConfig config = ledger.getConfig(); + OffloadPolicies policies1 = config.getLedgerOffloader().getOffloadPolicies(); + + assertEquals(policies1.getManagedLedgerOffloadThresholdInBytes(), policies.getManagedLedgerOffloadThresholdInBytes()); + assertEquals(policies1.getManagedLedgerOffloadThresholdInSeconds(), policies.getManagedLedgerOffloadThresholdInSeconds()); + } finally { + pulsar.getBrokerService().deleteTopic(topicName, true); + } + } + + @Test + public void testSetTopicOffloadPolicies() throws Exception { + conf.setManagedLedgerOffloadThresholdInSeconds(100); + conf.setManagedLedgerOffloadAutoTriggerSizeThresholdBytes(100); + + OffloadPoliciesImpl namespacePolicies = new OffloadPoliciesImpl(); + namespacePolicies.setManagedLedgerOffloadThresholdInBytes(200L); + namespacePolicies.setManagedLedgerOffloadThresholdInSeconds(200L); + namespacePolicies.setManagedLedgerOffloadDeletionLagInMillis(400L); + namespacePolicies.setManagedLedgerOffloadDriver("s3"); + namespacePolicies.setManagedLedgerOffloadBucket("buck2"); + + admin.namespaces().setOffloadThresholdInSeconds(myNamespace, 300); + assertEquals(300, admin.namespaces().getOffloadThresholdInSeconds(myNamespace)); + + admin.namespaces().setOffloadPolicies(myNamespace, namespacePolicies); + assertEquals(namespacePolicies, admin.namespaces().getOffloadPolicies(myNamespace)); + + OffloadPoliciesImpl topicPolicies = new OffloadPoliciesImpl(); + topicPolicies.setManagedLedgerOffloadThresholdInBytes(500L); + topicPolicies.setManagedLedgerOffloadThresholdInSeconds(500L); + topicPolicies.setManagedLedgerOffloadDeletionLagInMillis(400L); + topicPolicies.setManagedLedgerOffloadDriver("s3"); + topicPolicies.setManagedLedgerOffloadBucket("buck2"); + + String topicName = testTopic + UUID.randomUUID(); + admin.topicPolicies().setOffloadPolicies(topicName, topicPolicies); + + assertEquals(admin.topicPolicies().getOffloadPolicies(topicName).getManagedLedgerOffloadThresholdInSeconds(), + topicPolicies.getManagedLedgerOffloadThresholdInSeconds()); + assertEquals(admin.topicPolicies().getOffloadPolicies(topicName).getManagedLedgerOffloadThresholdInBytes(), + topicPolicies.getManagedLedgerOffloadThresholdInBytes()); + + try { + Topic topic = pulsar.getBrokerService().getOrCreateTopic(topicName).get(10, TimeUnit.SECONDS); + assertNotNull(topic); + + assertTrue(topic instanceof PersistentTopic); + + PersistentTopic persistentTopic = (PersistentTopic) topic; + ManagedLedger ledger = persistentTopic.getManagedLedger(); + ManagedLedgerConfig config = ledger.getConfig(); + OffloadPolicies policies1 = config.getLedgerOffloader().getOffloadPolicies(); + + assertEquals(policies1.getManagedLedgerOffloadThresholdInBytes(), + topicPolicies.getManagedLedgerOffloadThresholdInBytes()); + assertEquals(policies1.getManagedLedgerOffloadThresholdInSeconds(), + topicPolicies.getManagedLedgerOffloadThresholdInSeconds()); + } finally { + pulsar.getBrokerService().deleteTopic(topicName, true); + } + } + @Test public void testTopicLevelOffloadPartitioned() throws Exception { testOffload(true); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java index 485d8453e3857..63eeaaab7e6fd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java @@ -702,7 +702,7 @@ public void testReplicatorOffloadPolicies() throws Exception { init(namespace, persistentTopicName); OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create("s3", "region", "bucket", "endpoint", null, null, null, null, - 8, 9, 10L, null, OffloadedReadPriority.BOOKKEEPER_FIRST); + 8, 9, 10L, 10L, null, OffloadedReadPriority.BOOKKEEPER_FIRST); // local try { admin1.topicPolicies().setOffloadPolicies(persistentTopicName, offloadPolicies); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index 25eded8a20b9c..6d4889a751d37 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -1541,7 +1541,8 @@ public CompletableFuture setOffloadThresholdAsync(String namespace, long o } @Override - public void setOffloadThresholdInSeconds(String namespace, long offloadThresholdInSeconds) throws PulsarAdminException { + public void setOffloadThresholdInSeconds(String namespace, long offloadThresholdInSeconds) + throws PulsarAdminException { sync(() -> setOffloadThresholdInSecondsAsync(namespace, offloadThresholdInSeconds)); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java index ea6a1777a5ba0..fc72353a209be 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java @@ -1758,6 +1758,10 @@ private class SetOffloadPolicies extends CliCommand { , description = "ManagedLedger offload threshold in bytes", required = true) private long offloadThresholdInBytes; + @Parameter(names = {"-ts", "--offloadThresholdInSeconds"} + , description = "ManagedLedger offload threshold in seconds", required = true) + private long offloadThresholdInSeconds; + @Parameter(names = {"-dl", "--offloadDeletionLagInMillis"} , description = "ManagedLedger offload deletion lag in bytes") private Long offloadDeletionLagInMillis; @@ -1799,7 +1803,8 @@ void run() throws PulsarAdminException { s3Role, s3RoleSessionName, awsId, awsSecret, maxBlockSizeInBytes, - readBufferSizeInBytes, offloadThresholdInBytes, offloadDeletionLagInMillis, offloadedReadPriority); + readBufferSizeInBytes, offloadThresholdInBytes, offloadThresholdInSeconds, + offloadDeletionLagInMillis, offloadedReadPriority); getTopicPolicies(isGlobal).setOffloadPolicies(persistentTopic, offloadPolicies); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 3b6f571333855..2ce71ea13291f 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -2150,6 +2150,10 @@ private class SetOffloadPolicies extends CliCommand { , description = "ManagedLedger offload threshold in bytes", required = true) private long offloadThresholdInBytes; + @Parameter(names = {"-ts", "--offloadThresholdInSeconds"} + , description = "ManagedLedger offload threshold in seconds", required = true) + private long offloadThresholdInSeconds; + @Parameter(names = {"-dl", "--offloadDeletionLagInMillis"} , description = "ManagedLedger offload deletion lag in bytes") private Long offloadDeletionLagInMillis; @@ -2186,7 +2190,8 @@ void run() throws PulsarAdminException { s3Role, s3RoleSessionName, awsId, awsSecret, maxBlockSizeInBytes, - readBufferSizeInBytes, offloadThresholdInBytes, offloadDeletionLagInMillis, offloadedReadPriority); + readBufferSizeInBytes, offloadThresholdInBytes, offloadThresholdInSeconds, + offloadDeletionLagInMillis, offloadedReadPriority); getTopics().setOffloadPolicies(persistentTopic, offloadPolicies); } diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java index b10d575f88ca6..86b2ee56c85fe 100644 --- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java @@ -426,6 +426,7 @@ public void pulsarSplitJsonCodecTest() throws JsonProcessingException, Unsupport 5000, 2000, 1000L, + 1000L, 5000L, OffloadedReadPriority.BOOKKEEPER_FIRST ); From c1b16b3c5d0cb0d7674adcd6707a6a61d6880278 Mon Sep 17 00:00:00 2001 From: daojun Date: Thu, 27 Oct 2022 16:17:08 +0800 Subject: [PATCH 03/11] fix --- .../org/apache/pulsar/admin/cli/PulsarAdminToolTest.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 58c974ad2923a..36f8d041da811 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -970,6 +970,13 @@ public void topicPolicies() throws Exception { OffloadPoliciesImpl.create("s3", "region", "bucket" , "endpoint", null, null, null, null, 8, 9, 10L, null, null, OffloadedReadPriority.TIERED_STORAGE_FIRST)); + cmdTopics.run(split("set-offload-policies persistent://myprop/clust/ns1/ds1 -d s3 -r" + + " region -b bucket -e endpoint -m 8 -rb 9 -t 10 -ts 10 -orp tiered-storage-first")); + verify(mockTopicsPolicies) + .setOffloadPolicies("persistent://myprop/clust/ns1/ds1", + OffloadPoliciesImpl.create("s3", "region", "bucket" , "endpoint", null, null, null, null, + 8, 9, 10L, 10L, null, OffloadedReadPriority.TIERED_STORAGE_FIRST)); + cmdTopics.run(split("get-retention persistent://myprop/clust/ns1/ds1")); verify(mockTopicsPolicies).getRetention("persistent://myprop/clust/ns1/ds1", false); cmdTopics.run(split("set-retention persistent://myprop/clust/ns1/ds1 -t 10m -s 20M")); From 2f87e78ff36558689cb7519682c4b4ca5fc25a16 Mon Sep 17 00:00:00 2001 From: daojun Date: Tue, 1 Nov 2022 16:04:26 +0800 Subject: [PATCH 04/11] review fix --- .../broker/admin/impl/NamespacesBase.java | 44 ++++++++++--------- .../pulsar/broker/admin/v2/Namespaces.java | 21 ++++++--- 2 files changed, 38 insertions(+), 27 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 3dacc5471c144..88d1f760db748 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -1965,30 +1965,34 @@ protected void internalSetOffloadThreshold(long newThreshold) { } } - protected void internalSetOffloadThresholdInSeconds(long newThreshold) { + protected CompletableFuture internalSetOffloadThresholdInSecondsAsync(long newThreshold) { validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.WRITE); validatePoliciesReadOnlyAccess(); - try { - updatePolicies(namespaceName, policies -> { - if (policies.offload_policies == null) { - policies.offload_policies = new OffloadPoliciesImpl(); - } - ((OffloadPoliciesImpl) policies.offload_policies) - .setManagedLedgerOffloadThresholdInSeconds(newThreshold); - policies.offload_threshold_in_seconds = newThreshold; - return policies; - }); - log.info("[{}] Successfully updated offloadThresholdInSeconds configuration: namespace={}, value={}", - clientAppId(), namespaceName, newThreshold); + CompletableFuture f = new CompletableFuture<>(); + updatePoliciesAsync(namespaceName, + policies -> { + if (policies.offload_policies == null) { + policies.offload_policies = new OffloadPoliciesImpl(); + } + ((OffloadPoliciesImpl) policies.offload_policies) + .setManagedLedgerOffloadThresholdInSeconds(newThreshold); + policies.offload_threshold_in_seconds = newThreshold; + return policies; + }) + .thenAccept(v -> { + log.info("[{}] Successfully updated offloadThresholdInSeconds configuration: namespace={}, value={}", + clientAppId(), namespaceName, newThreshold); + f.complete(null); + }) + .exceptionally(t -> { + log.error("[{}] Failed to update offloadThresholdInSeconds configuration for namespace {}", + clientAppId(), namespaceName, t); + f.completeExceptionally(new RestException(t)); + return null; + }); - } catch (RestException pfe) { - throw pfe; - } catch (Exception e) { - log.error("[{}] Failed to update offloadThresholdInSeconds configuration for namespace {}", - clientAppId(), namespaceName, e); - throw new RestException(e); - } + return f; } protected void internalSetOffloadDeletionLag(Long newDeletionLagMs) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index fb8d65c41ac75..3847ee6d3ab0b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -2118,18 +2118,25 @@ public void getOffloadThresholdInSeconds( @PUT @Path("/{tenant}/{namespace}/offloadThresholdInSeconds") - @ApiOperation(value = "Set maximum number of bytes stored on the pulsar cluster for a topic," + @ApiOperation(value = "Set maximum number of seconds stored on the pulsar cluster for a topic," + " before the broker will start offloading to longterm storage", notes = "A negative value disables automatic offloading") - @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace doesn't exist"), @ApiResponse(code = 409, message = "Concurrent modification"), - @ApiResponse(code = 412, message = "offloadThreshold value is not valid") }) - public void setOffloadThresholdInSeconds(@PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - long newThreshold) { + @ApiResponse(code = 412, message = "offloadThresholdInSeconds value is not valid") }) + public void setOffloadThresholdInSeconds( + @Suspended final AsyncResponse response, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + long newThreshold) { validateNamespaceName(tenant, namespace); - internalSetOffloadThresholdInSeconds(newThreshold); + internalSetOffloadThresholdInSecondsAsync(newThreshold) + .thenAccept(response::resume) + .exceptionally(t -> { + resumeAsyncResponseExceptionally(response, t); + return null; + }); } @GET From 2363e22084584ed22c153a42ba1f1ad561721179 Mon Sep 17 00:00:00 2001 From: daojun Date: Thu, 3 Nov 2022 01:16:17 +0800 Subject: [PATCH 05/11] Review fix --- .../broker/admin/impl/NamespacesBase.java | 27 ++++++++++--------- .../broker/admin/AdminApiOffloadTest.java | 8 +++--- .../pulsar/broker/admin/NamespacesTest.java | 12 ++++----- .../policies/data/OffloadPoliciesImpl.java | 7 ++++- .../policies/data/OffloadPoliciesTest.java | 5 ++++ 5 files changed, 35 insertions(+), 24 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 88d1f760db748..a2954618ff0a6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -1966,20 +1966,21 @@ protected void internalSetOffloadThreshold(long newThreshold) { } protected CompletableFuture internalSetOffloadThresholdInSecondsAsync(long newThreshold) { - validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.WRITE); - validatePoliciesReadOnlyAccess(); - CompletableFuture f = new CompletableFuture<>(); - updatePoliciesAsync(namespaceName, - policies -> { - if (policies.offload_policies == null) { - policies.offload_policies = new OffloadPoliciesImpl(); - } - ((OffloadPoliciesImpl) policies.offload_policies) - .setManagedLedgerOffloadThresholdInSeconds(newThreshold); - policies.offload_threshold_in_seconds = newThreshold; - return policies; - }) + + validateNamespacePolicyOperationAsync(namespaceName, PolicyName.OFFLOAD, PolicyOperation.WRITE) + .thenApply(v -> validatePoliciesReadOnlyAccessAsync()) + .thenApply(v -> updatePoliciesAsync(namespaceName, + policies -> { + if (policies.offload_policies == null) { + policies.offload_policies = new OffloadPoliciesImpl(); + } + ((OffloadPoliciesImpl) policies.offload_policies) + .setManagedLedgerOffloadThresholdInSeconds(newThreshold); + policies.offload_threshold_in_seconds = newThreshold; + return policies; + }) + ) .thenAccept(v -> { log.info("[{}] Successfully updated offloadThresholdInSeconds configuration: namespace={}, value={}", clientAppId(), namespaceName, newThreshold); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java index 1746a0e95daef..9abe3e648237d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java @@ -289,10 +289,10 @@ public void testSetNamespaceOffloadPolicies() throws Exception { policies.setManagedLedgerOffloadBucket("buck2"); admin.namespaces().setOffloadThresholdInSeconds(myNamespace, 300); - assertEquals(300, admin.namespaces().getOffloadThresholdInSeconds(myNamespace)); + assertEquals(admin.namespaces().getOffloadThresholdInSeconds(myNamespace), 300); admin.namespaces().setOffloadPolicies(myNamespace, policies); - assertEquals(policies, admin.namespaces().getOffloadPolicies(myNamespace)); + assertEquals(admin.namespaces().getOffloadPolicies(myNamespace), policies); String topicName = testTopic + UUID.randomUUID(); try { @@ -326,10 +326,10 @@ public void testSetTopicOffloadPolicies() throws Exception { namespacePolicies.setManagedLedgerOffloadBucket("buck2"); admin.namespaces().setOffloadThresholdInSeconds(myNamespace, 300); - assertEquals(300, admin.namespaces().getOffloadThresholdInSeconds(myNamespace)); + assertEquals(admin.namespaces().getOffloadThresholdInSeconds(myNamespace), 300); admin.namespaces().setOffloadPolicies(myNamespace, namespacePolicies); - assertEquals(namespacePolicies, admin.namespaces().getOffloadPolicies(myNamespace)); + assertEquals(admin.namespaces().getOffloadPolicies(myNamespace),namespacePolicies); OffloadPoliciesImpl topicPolicies = new OffloadPoliciesImpl(); topicPolicies.setManagedLedgerOffloadThresholdInBytes(500L); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index e65597c1eff46..58b55471a778f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -1404,12 +1404,12 @@ public void testSetOffloadThreshold() throws Exception { admin.topics().createNonPartitionedTopic(topicName.toString()); admin.namespaces().setOffloadDeleteLag(namespace, 10000, TimeUnit.SECONDS); - assertEquals(-1, admin.namespaces().getOffloadThreshold(namespace)); - assertEquals(-1, admin.namespaces().getOffloadThresholdInSeconds(namespace)); + assertEquals(admin.namespaces().getOffloadThreshold(namespace), -1); + assertEquals(admin.namespaces().getOffloadThresholdInSeconds(namespace), -1); // assert we get the default which indicates it will fall back to default - assertEquals(-1, admin.namespaces().getOffloadThreshold(namespace)); - assertEquals(-1, admin.namespaces().getOffloadThresholdInSeconds(namespace)); + assertEquals(admin.namespaces().getOffloadThreshold(namespace), -1); + assertEquals(admin.namespaces().getOffloadThresholdInSeconds(namespace), -1); // the ledger config should have the expected value ManagedLedgerConfig ledgerConf = pulsar.getBrokerService().getManagedLedgerConfig(topicName).get(); MockLedgerOffloader offloader = new MockLedgerOffloader(OffloadPoliciesImpl.create("S3", "", "", "", @@ -1431,8 +1431,8 @@ public void testSetOffloadThreshold() throws Exception { // set an override for the namespace admin.namespaces().setOffloadThreshold(namespace, 100); admin.namespaces().setOffloadThresholdInSeconds(namespace, 100); - assertEquals(100, admin.namespaces().getOffloadThreshold(namespace)); - assertEquals(100, admin.namespaces().getOffloadThresholdInSeconds(namespace)); + assertEquals(admin.namespaces().getOffloadThreshold(namespace), 100); + assertEquals(admin.namespaces().getOffloadThresholdInSeconds(namespace), 100); ledgerConf = pulsar.getBrokerService().getManagedLedgerConfig(topicName).get(); admin.namespaces().getOffloadPolicies(namespace); offloader = new MockLedgerOffloader(OffloadPoliciesImpl.create("S3", "", "", "", diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java index 7a7854b62e46f..f415a2b288f9f 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java @@ -402,7 +402,8 @@ private static void setProperty(Properties properties, String key, Object value) * @return offload policies */ public static OffloadPoliciesImpl oldPoliciesCompatible(OffloadPoliciesImpl nsLevelPolicies, Policies policies) { - if (policies == null || (policies.offload_threshold == -1 && policies.offload_deletion_lag_ms == null)) { + if (policies == null || (policies.offload_threshold == -1 && policies.offload_threshold_in_seconds == -1 + && policies.offload_deletion_lag_ms == null)) { return nsLevelPolicies; } if (nsLevelPolicies == null) { @@ -412,6 +413,10 @@ public static OffloadPoliciesImpl oldPoliciesCompatible(OffloadPoliciesImpl nsLe && policies.offload_threshold != -1) { nsLevelPolicies.setManagedLedgerOffloadThresholdInBytes(policies.offload_threshold); } + if (nsLevelPolicies.getManagedLedgerOffloadThresholdInSeconds() == null + && policies.offload_threshold_in_seconds != -1) { + nsLevelPolicies.setManagedLedgerOffloadThresholdInSeconds(policies.offload_threshold_in_seconds); + } if (nsLevelPolicies.getManagedLedgerOffloadDeletionLagInMillis() == null && policies.offload_deletion_lag_ms != null) { nsLevelPolicies.setManagedLedgerOffloadDeletionLagInMillis(policies.offload_deletion_lag_ms); diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java index 38e85d8047981..92633b129ce55 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java @@ -261,22 +261,27 @@ public void compatibleWithConfigFileTest() { public void oldPoliciesCompatibleTest() { Policies policies = new Policies(); Assert.assertEquals(policies.offload_threshold, -1); + Assert.assertEquals(policies.offload_threshold_in_seconds, -1); OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.oldPoliciesCompatible(null, policies); Assert.assertNull(offloadPolicies); policies.offload_deletion_lag_ms = 1000L; policies.offload_threshold = 0; + policies.offload_threshold_in_seconds = 0; offloadPolicies = OffloadPoliciesImpl.oldPoliciesCompatible(offloadPolicies, policies); Assert.assertNotNull(offloadPolicies); Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis(), Long.valueOf(1000)); Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadThresholdInBytes(), Long.valueOf(0)); + Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadThresholdInSeconds(), Long.valueOf(0)); policies.offload_deletion_lag_ms = 2000L; policies.offload_threshold = 100; + policies.offload_threshold_in_seconds = 100; offloadPolicies = OffloadPoliciesImpl.oldPoliciesCompatible(offloadPolicies, policies); Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis(), Long.valueOf(1000)); Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadThresholdInBytes(), Long.valueOf(0)); + Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadThresholdInSeconds(), Long.valueOf(0)); } @Test From 9f9afd008755cafa063631836d272b8eb4e00840 Mon Sep 17 00:00:00 2001 From: daojun Date: Tue, 15 Nov 2022 16:37:39 +0800 Subject: [PATCH 06/11] fix checkstyle --- .../org/apache/pulsar/broker/admin/impl/NamespacesBase.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 42301199d5ad1..3a81aefbf359d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -1984,8 +1984,8 @@ protected CompletableFuture internalSetOffloadThresholdInSecondsAsync(long }) ) .thenAccept(v -> { - log.info("[{}] Successfully updated offloadThresholdInSeconds configuration: namespace={}, value={}", - clientAppId(), namespaceName, newThreshold); + log.info("[{}] Successfully updated offloadThresholdInSeconds configuration:" + + " namespace={}, value={}", clientAppId(), namespaceName, newThreshold); f.complete(null); }) .exceptionally(t -> { From 5520e2b366723ba56def9ba79461f8a66ea33ad4 Mon Sep 17 00:00:00 2001 From: daojun Date: Wed, 16 Nov 2022 11:25:11 +0800 Subject: [PATCH 07/11] fix checkstyle --- .../org/apache/pulsar/broker/admin/impl/NamespacesBase.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 3a81aefbf359d..04fdc6d5104e2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -1984,8 +1984,8 @@ protected CompletableFuture internalSetOffloadThresholdInSecondsAsync(long }) ) .thenAccept(v -> { - log.info("[{}] Successfully updated offloadThresholdInSeconds configuration:" + - " namespace={}, value={}", clientAppId(), namespaceName, newThreshold); + log.info("[{}] Successfully updated offloadThresholdInSeconds configuration:" + + " namespace={}, value={}", clientAppId(), namespaceName, newThreshold); f.complete(null); }) .exceptionally(t -> { From b26a7b89b6a23e4ceb63996f22ce9712ee91e501 Mon Sep 17 00:00:00 2001 From: daojun Date: Wed, 16 Nov 2022 20:26:22 +0800 Subject: [PATCH 08/11] fix tests --- .../pulsar/admin/cli/PulsarAdminToolTest.java | 13 +++---------- .../apache/pulsar/admin/cli/CmdTopicPolicies.java | 4 ++-- .../java/org/apache/pulsar/admin/cli/CmdTopics.java | 4 ++-- 3 files changed, 7 insertions(+), 14 deletions(-) diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index e99f7da70ece3..aedc308a55db0 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -814,7 +814,7 @@ public void namespaces() throws Exception { verify(mockNamespaces).clearOffloadDeleteLag("myprop/clust/ns1"); namespaces.run(split( - "set-offload-policies myprop/clust/ns1 -r test-region -d aws-s3 -b test-bucket -e http://test.endpoint -mbs 32M -rbs 5M -oat 10M -aots 100 -oae 10s -orp tiered-storage-first")); + "set-offload-policies myprop/clust/ns1 -r test-region -d aws-s3 -b test-bucket -e http://test.endpoint -mbs 32M -rbs 5M -oat 10M -oats 100 -oae 10s -orp tiered-storage-first")); verify(mockNamespaces).setOffloadPolicies("myprop/clust/ns1", OffloadPoliciesImpl.create("aws-s3", "test-region", "test-bucket", "http://test.endpoint",null, null, null, null, 32 * 1024 * 1024, 5 * 1024 * 1024, @@ -973,13 +973,6 @@ public void topicPolicies() throws Exception { cmdTopics.run(split("remove-offload-policies persistent://myprop/clust/ns1/ds1")); verify(mockTopicsPolicies).removeOffloadPolicies("persistent://myprop/clust/ns1/ds1"); - cmdTopics.run(split("set-offload-policies persistent://myprop/clust/ns1/ds1 -d s3 -r" + - " region -b bucket -e endpoint -m 8 -rb 9 -t 10 -orp tiered-storage-first")); - verify(mockTopicsPolicies) - .setOffloadPolicies("persistent://myprop/clust/ns1/ds1", - OffloadPoliciesImpl.create("s3", "region", "bucket" , "endpoint", null, null, null, null, - 8, 9, 10L, null, null, OffloadedReadPriority.TIERED_STORAGE_FIRST)); - cmdTopics.run(split("set-offload-policies persistent://myprop/clust/ns1/ds1 -d s3 -r" + " region -b bucket -e endpoint -m 8 -rb 9 -t 10 -ts 10 -orp tiered-storage-first")); verify(mockTopicsPolicies) @@ -1431,7 +1424,7 @@ public void topicPolicies() throws Exception { verify(mockGlobalTopicsPolicies).removeOffloadPolicies("persistent://myprop/clust/ns1/ds1"); cmdTopics.run(split("set-offload-policies persistent://myprop/clust/ns1/ds1 -d s3 -r" + - " region -b bucket -e endpoint -m 8 -rb 9 -t 10 -oats 100 -orp tiered-storage-first -g")); + " region -b bucket -e endpoint -m 8 -rb 9 -t 10 -ts 100 -orp tiered-storage-first -g")); verify(mockGlobalTopicsPolicies) .setOffloadPolicies("persistent://myprop/clust/ns1/ds1", OffloadPoliciesImpl.create("s3", "region", "bucket" , "endpoint", null, null, null, null, @@ -1728,7 +1721,7 @@ public void topics() throws Exception { cmdTopics.run(split("remove-delayed-delivery persistent://myprop/clust/ns1/ds1")); verify(mockTopics).removeDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1") ; - cmdTopics.run(split("set-offload-policies persistent://myprop/clust/ns1/ds1 -d s3 -r region -b bucket -e endpoint -oats 50 -m 8 -rb 9 -t 10 -orp tiered-storage-first")); + cmdTopics.run(split("set-offload-policies persistent://myprop/clust/ns1/ds1 -d s3 -r region -b bucket -e endpoint -ts 50 -m 8 -rb 9 -t 10 -orp tiered-storage-first")); OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create("s3", "region", "bucket" , "endpoint", null, null, null, null, 8, 9, 10L, 50L, null, OffloadedReadPriority.TIERED_STORAGE_FIRST); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java index 5eb0806f89b1b..98e6530082634 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java @@ -1764,8 +1764,8 @@ private class SetOffloadPolicies extends CliCommand { private long offloadThresholdInBytes; @Parameter(names = {"-ts", "--offloadThresholdInSeconds"} - , description = "ManagedLedger offload threshold in seconds", required = true) - private long offloadThresholdInSeconds; + , description = "ManagedLedger offload threshold in seconds") + private Long offloadThresholdInSeconds; @Parameter(names = {"-dl", "--offloadDeletionLagInMillis"} , description = "ManagedLedger offload deletion lag in bytes") diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 2ce71ea13291f..60aa649737878 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -2151,8 +2151,8 @@ private class SetOffloadPolicies extends CliCommand { private long offloadThresholdInBytes; @Parameter(names = {"-ts", "--offloadThresholdInSeconds"} - , description = "ManagedLedger offload threshold in seconds", required = true) - private long offloadThresholdInSeconds; + , description = "ManagedLedger offload threshold in seconds") + private Long offloadThresholdInSeconds; @Parameter(names = {"-dl", "--offloadDeletionLagInMillis"} , description = "ManagedLedger offload deletion lag in bytes") From d172d847dc507b987677122b9dc57956e602c83a Mon Sep 17 00:00:00 2001 From: daojun Date: Fri, 18 Nov 2022 01:53:05 +0800 Subject: [PATCH 09/11] fix tests --- .../broker/admin/AdminApiOffloadTest.java | 63 +++++++------------ 1 file changed, 22 insertions(+), 41 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java index 9abe3e648237d..99b2cf2ed7e75 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java @@ -293,24 +293,6 @@ public void testSetNamespaceOffloadPolicies() throws Exception { admin.namespaces().setOffloadPolicies(myNamespace, policies); assertEquals(admin.namespaces().getOffloadPolicies(myNamespace), policies); - - String topicName = testTopic + UUID.randomUUID(); - try { - Topic topic = pulsar.getBrokerService().getOrCreateTopic(topicName).get(10, TimeUnit.SECONDS); - assertNotNull(topic); - - assertTrue(topic instanceof PersistentTopic); - - PersistentTopic persistentTopic = (PersistentTopic) topic; - ManagedLedger ledger = persistentTopic.getManagedLedger(); - ManagedLedgerConfig config = ledger.getConfig(); - OffloadPolicies policies1 = config.getLedgerOffloader().getOffloadPolicies(); - - assertEquals(policies1.getManagedLedgerOffloadThresholdInBytes(), policies.getManagedLedgerOffloadThresholdInBytes()); - assertEquals(policies1.getManagedLedgerOffloadThresholdInSeconds(), policies.getManagedLedgerOffloadThresholdInSeconds()); - } finally { - pulsar.getBrokerService().deleteTopic(topicName, true); - } } @Test @@ -318,6 +300,10 @@ public void testSetTopicOffloadPolicies() throws Exception { conf.setManagedLedgerOffloadThresholdInSeconds(100); conf.setManagedLedgerOffloadAutoTriggerSizeThresholdBytes(100); + LedgerOffloader topicOffloader = mock(LedgerOffloader.class); + when(topicOffloader.getOffloadDriverName()).thenReturn("mock"); + doReturn(topicOffloader).when(pulsar).createManagedLedgerOffloader(any()); + OffloadPoliciesImpl namespacePolicies = new OffloadPoliciesImpl(); namespacePolicies.setManagedLedgerOffloadThresholdInBytes(200L); namespacePolicies.setManagedLedgerOffloadThresholdInSeconds(200L); @@ -329,7 +315,7 @@ public void testSetTopicOffloadPolicies() throws Exception { assertEquals(admin.namespaces().getOffloadThresholdInSeconds(myNamespace), 300); admin.namespaces().setOffloadPolicies(myNamespace, namespacePolicies); - assertEquals(admin.namespaces().getOffloadPolicies(myNamespace),namespacePolicies); + assertEquals(admin.namespaces().getOffloadPolicies(myNamespace), namespacePolicies); OffloadPoliciesImpl topicPolicies = new OffloadPoliciesImpl(); topicPolicies.setManagedLedgerOffloadThresholdInBytes(500L); @@ -339,30 +325,25 @@ public void testSetTopicOffloadPolicies() throws Exception { topicPolicies.setManagedLedgerOffloadBucket("buck2"); String topicName = testTopic + UUID.randomUUID(); + admin.topics().createNonPartitionedTopic(topicName); admin.topicPolicies().setOffloadPolicies(topicName, topicPolicies); - assertEquals(admin.topicPolicies().getOffloadPolicies(topicName).getManagedLedgerOffloadThresholdInSeconds(), - topicPolicies.getManagedLedgerOffloadThresholdInSeconds()); - assertEquals(admin.topicPolicies().getOffloadPolicies(topicName).getManagedLedgerOffloadThresholdInBytes(), - topicPolicies.getManagedLedgerOffloadThresholdInBytes()); - - try { - Topic topic = pulsar.getBrokerService().getOrCreateTopic(topicName).get(10, TimeUnit.SECONDS); - assertNotNull(topic); - - assertTrue(topic instanceof PersistentTopic); - - PersistentTopic persistentTopic = (PersistentTopic) topic; - ManagedLedger ledger = persistentTopic.getManagedLedger(); - ManagedLedgerConfig config = ledger.getConfig(); - OffloadPolicies policies1 = config.getLedgerOffloader().getOffloadPolicies(); - - assertEquals(policies1.getManagedLedgerOffloadThresholdInBytes(), - topicPolicies.getManagedLedgerOffloadThresholdInBytes()); - assertEquals(policies1.getManagedLedgerOffloadThresholdInSeconds(), - topicPolicies.getManagedLedgerOffloadThresholdInSeconds()); - } finally { - pulsar.getBrokerService().deleteTopic(topicName, true); + // Wait until broker update policies finished. + for (int a = 1; a <= 5; a++) { + try { + OffloadPolicies policies = admin.topicPolicies().getOffloadPolicies(topicName); + + assertEquals(policies.getManagedLedgerOffloadThresholdInSeconds(), + topicPolicies.getManagedLedgerOffloadThresholdInSeconds()); + assertEquals(policies.getManagedLedgerOffloadThresholdInBytes(), + topicPolicies.getManagedLedgerOffloadThresholdInBytes()); + } catch (Exception e) { + if (a == 5) { + throw e; + } else { + Thread.sleep(1000L); + } + } } } From a22d3b35c1d3bd41bbcb8695779c2b623682bc53 Mon Sep 17 00:00:00 2001 From: daojun Date: Fri, 18 Nov 2022 02:07:22 +0800 Subject: [PATCH 10/11] fix checkstyle --- .../org/apache/pulsar/broker/admin/AdminApiOffloadTest.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java index 99b2cf2ed7e75..604bc437f1963 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java @@ -52,14 +52,9 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; - import org.apache.bookkeeper.mledger.LedgerOffloader; -import org.apache.bookkeeper.mledger.ManagedLedger; -import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerInfo; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; -import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.LongRunningProcessStatus; import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException; From 77b8032c70c4b07b5fcafa59498d59ce769a4a5d Mon Sep 17 00:00:00 2001 From: daojun Date: Fri, 18 Nov 2022 11:04:37 +0800 Subject: [PATCH 11/11] fix tests --- .../java/org/apache/pulsar/broker/admin/v2/Namespaces.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 073ea5503b60a..b6bf1f0927cc6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -2105,7 +2105,8 @@ public void getOffloadThresholdInSeconds( validateNamespacePolicyOperationAsync(namespaceName, PolicyName.OFFLOAD, PolicyOperation.READ) .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) .thenAccept(policies -> { - if (policies.offload_policies == null) { + if (policies.offload_policies == null + || policies.offload_policies.getManagedLedgerOffloadThresholdInSeconds() == null) { asyncResponse.resume(policies.offload_threshold_in_seconds); } else { asyncResponse.resume(policies.offload_policies.getManagedLedgerOffloadThresholdInSeconds());