diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index 0c93a5b642cf6..8c45b79bcbf5a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -85,6 +85,7 @@ public class ManagedLedgerConfig { private int minimumBacklogCursorsForCaching = 0; private int minimumBacklogEntriesForCaching = 1000; private int maxBacklogBetweenCursorsForCaching = 1000; + private long managedLedgerOffloadFlowPermitsPerSecond = -1; @Getter @Setter @@ -752,5 +753,23 @@ public String getShadowSource() { return MapUtils.getString(properties, PROPERTY_SOURCE_TOPIC_KEY); } + /** + * Set permitted size to offload on the broker. + * + * @param managedLedgerOffloadBrokerFlowPermit + */ + public void setManagedLedgerOffloadFlowPermitsPerSecond(long managedLedgerOffloadBrokerFlowPermit) { + this.managedLedgerOffloadFlowPermitsPerSecond = managedLedgerOffloadBrokerFlowPermit; + } + + /** + * Get permitted size to offload on the broker. + * + * @return + */ + public long getManagedLedgerOffloadFlowPermitsPerSecond() { + return managedLedgerOffloadFlowPermitsPerSecond; + } + public static final String PROPERTY_SOURCE_TOPIC_KEY = "PULSAR.SHADOW_SOURCE"; } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java new file mode 100644 index 0000000000000..9a2e1a06da393 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger; + +import com.google.common.annotations.VisibleForTesting; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerMetadata; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.mledger.proto.MLDataFormats; +import org.apache.pulsar.common.util.qos.AsyncTokenBucket; + +/** + * OffloadReadHandle is a wrapper of ReadHandle to offload read operations. + */ +public final class OffloadReadHandle implements ReadHandle { + private static final AtomicBoolean INITIALIZED = new AtomicBoolean(false); + private static volatile long flowPermits = -1L; + private static volatile AsyncTokenBucket tokenBucket; + + private final ReadHandle delegate; + private final long averageEntrySize; + + private OffloadReadHandle(ReadHandle handle, ManagedLedgerConfig config, + MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo) { + initialize(config); + this.delegate = Objects.requireNonNull(handle); + Objects.requireNonNull(ledgerInfo); + long averageEntrySize = ledgerInfo.getSize() / ledgerInfo.getEntries(); + if (averageEntrySize <= 0) { + averageEntrySize = 1; + } + this.averageEntrySize = averageEntrySize; + } + + private static void initialize(ManagedLedgerConfig config) { + if (INITIALIZED.compareAndSet(false, true)) { + flowPermits = config.getManagedLedgerOffloadFlowPermitsPerSecond(); + if (flowPermits > 0) { + tokenBucket = AsyncTokenBucket.builder().initialTokens(0).capacity(2 * flowPermits) + .rate(flowPermits).build(); + } + } + } + + public static CompletableFuture create(ReadHandle handle, ManagedLedgerConfig config, + MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo) { + return CompletableFuture.completedFuture(new OffloadReadHandle(handle, config, ledgerInfo)); + } + + @Override + public CompletableFuture readAsync(long firstEntry, long lastEntry) { + long numEntries = lastEntry - firstEntry + 1; + long numBytes = numEntries * averageEntrySize; + + long delayMillis; + // block the offloader thread if the flow control permits is exceeded. + while ((delayMillis = calculateDelayMillis(numBytes)) > 0) { + try { + Thread.sleep(delayMillis); + } catch (InterruptedException ex) { + return CompletableFuture.failedFuture(ex); + } + } + + return delegate.readAsync(firstEntry, lastEntry); + } + + @Override + public CompletableFuture readUnconfirmedAsync(long firstEntry, long lastEntry) { + return this.delegate.readUnconfirmedAsync(firstEntry, lastEntry); + } + + @Override + public CompletableFuture readLastAddConfirmedAsync() { + return this.delegate.readLastAddConfirmedAsync(); + } + + @Override + public CompletableFuture tryReadLastAddConfirmedAsync() { + return this.delegate.tryReadLastAddConfirmedAsync(); + } + + @Override + public long getLastAddConfirmed() { + return this.delegate.getLastAddConfirmed(); + } + + @Override + public long getLength() { + return this.delegate.getLength(); + } + + @Override + public boolean isClosed() { + return this.delegate.isClosed(); + } + + @Override + public CompletableFuture readLastAddConfirmedAndEntryAsync( + long entryId, long timeOutInMillis, boolean parallel) { + return this.delegate.readLastAddConfirmedAndEntryAsync(entryId, timeOutInMillis, parallel); + } + + @Override + public long getId() { + return this.delegate.getId(); + } + + @Override + public CompletableFuture closeAsync() { + return this.delegate.closeAsync(); + } + + @Override + public LedgerMetadata getLedgerMetadata() { + return this.delegate.getLedgerMetadata(); + } + + + private static synchronized long calculateDelayMillis(long numBytes) { + if (flowPermits <= 0) { + return 0; + } + if (numBytes <= 0) { + return 0; + } + + if (tokenBucket.containsTokens(true)) { + long token = tokenBucket.getTokens(); + if (token > 0) { + // To prevent flowPermits is less than each batch size. + tokenBucket.consumeTokens(numBytes); + return 0; + } + } + + return TimeUnit.NANOSECONDS.toMillis(tokenBucket.calculateThrottlingDuration()); + } + + @VisibleForTesting + public void reset() { + INITIALIZED.set(false); + flowPermits = -1L; + tokenBucket = null; + } +} diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index e5e163127f7b6..7de779e81aa60 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -111,6 +111,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException; import org.apache.bookkeeper.mledger.ManagedLedgerMXBean; +import org.apache.bookkeeper.mledger.OffloadReadHandle; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.WaitingEntryCallBack; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback; @@ -3255,6 +3256,7 @@ void offloadLoop(CompletableFuture promise, Queue ledg prepareLedgerInfoForOffloaded(ledgerId, uuid, driverName, driverMetadata) .thenCompose((ignore) -> getLedgerHandle(ledgerId)) + .thenCompose(h -> OffloadReadHandle.create(h, config, info)) .thenCompose(readHandle -> config.getLedgerOffloader().offload(readHandle, uuid, extraMetadata)) .thenCompose((ignore) -> { return Retries.run(Backoff.exponentialJittered(TimeUnit.SECONDS.toMillis(1), diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/OffloadReadHandleTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/OffloadReadHandleTest.java new file mode 100644 index 0000000000000..174252c647679 --- /dev/null +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/OffloadReadHandleTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger; + +import com.google.common.collect.Lists; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.client.impl.LedgerEntriesImpl; +import org.apache.bookkeeper.client.impl.LedgerEntryImpl; +import org.apache.bookkeeper.mledger.proto.MLDataFormats; +import org.apache.bookkeeper.test.MockedBookKeeperTestCase; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +public class OffloadReadHandleTest extends MockedBookKeeperTestCase { + + @DataProvider(name = "flowPermits") + public Object[][] permits() { + return new Object[][]{ + {-1L}, + {0L}, + {50L}, + {100L}, + {10000L} + }; + } + + @Test(dataProvider = "flowPermits") + public void testFlowPermits(long flowPermits) throws Exception { + OffloadReadHandle handle = (OffloadReadHandle) initializeReadHandle(flowPermits); + try { + long start = System.currentTimeMillis(); + handle.read(1, 1); + handle.read(1, 1); + handle.read(1, 1); + handle.read(1, 1); + handle.read(1, 1); + + long actualDuration = System.currentTimeMillis() - start; + if (flowPermits <= 0L) { + Assert.assertEquals(actualDuration, 4000D, 4000D); + } else if (flowPermits == 50L) { + long expectDuration = 8000; + Assert.assertEquals(actualDuration, expectDuration, expectDuration * 0.2D); + } else if (flowPermits == 100L) { + long expectDuration = 4000; + Assert.assertEquals(actualDuration, expectDuration, expectDuration * 0.2D); + } else if (flowPermits == 10000L) { + Assert.assertEquals(actualDuration, 1000D, 1000D); + } + } finally { + handle.close(); + handle.reset(); + } + } + + + @Test + public void testOffloadFlowPermitsMultiThreads() throws Exception { + OffloadReadHandle handle = (OffloadReadHandle) initializeReadHandle(1000); + AtomicBoolean failed = new AtomicBoolean(false); + CountDownLatch latch = new CountDownLatch(10); + try { + long start = System.currentTimeMillis(); + for (int i = 0; i < 10; i++) { + new Thread(() -> { + for (int j = 0; j < 10; j++) { + try { + handle.read(1, 1); + } catch (Exception e) { + failed.set(true); + } + } + latch.countDown(); + }).start(); + } + + latch.await(); + + Assert.assertFalse(failed.get()); + long actualDuration = System.currentTimeMillis() - start; + long expectDuration = TimeUnit.SECONDS.toMillis(10); + Assert.assertEquals(actualDuration, expectDuration, expectDuration * 0.2D); + } finally { + handle.close(); + handle.reset(); + } + } + + + private ReadHandle initializeReadHandle(long flowPermits) throws Exception { + ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(100); + for (int a = 0; a < 100; a++) { + buf.writeByte(0); + } + LedgerEntry entry = LedgerEntryImpl.create(1, 1, buf.readableBytes(), buf); + List entryList = Lists.newArrayList(entry); + LedgerEntries entries = LedgerEntriesImpl.create(entryList); + ReadHandle handle = Mockito.mock(ReadHandle.class); + Mockito.doAnswer(inv -> CompletableFuture.completedFuture(entries)).when(handle) + .readAsync(Mockito.anyLong(), Mockito.anyLong()); + Mockito.doAnswer(inv -> { + entries.close(); + return CompletableFuture.completedFuture(null); + }).when(handle).closeAsync(); + + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setManagedLedgerOffloadFlowPermitsPerSecond(flowPermits); + + CompletableFuture future = OffloadReadHandle.create(handle, config, + MLDataFormats.ManagedLedgerInfo.LedgerInfo.newBuilder().setLedgerId(1) + .setEntries(1).setSize(100).build()); + + return future.get(); + } +} \ No newline at end of file diff --git a/microbench/pom.xml b/microbench/pom.xml index a568e716ba0fa..b7a2403694e41 100644 --- a/microbench/pom.xml +++ b/microbench/pom.xml @@ -94,7 +94,7 @@ ${project.groupId} - pulsar-broker + pulsar-common ${project.version} diff --git a/microbench/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBenchmark.java b/microbench/src/main/java/org/apache/pulsar/common/util/qos/AsyncTokenBucketBenchmark.java similarity index 98% rename from microbench/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBenchmark.java rename to microbench/src/main/java/org/apache/pulsar/common/util/qos/AsyncTokenBucketBenchmark.java index 4c069e72ea3ba..a2bd07763d9b4 100644 --- a/microbench/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBenchmark.java +++ b/microbench/src/main/java/org/apache/pulsar/common/util/qos/AsyncTokenBucketBenchmark.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.pulsar.broker.qos; +package org.apache.pulsar.common.util.qos; import java.util.concurrent.TimeUnit; import org.openjdk.jmh.annotations.Benchmark; diff --git a/microbench/src/main/java/org/apache/pulsar/broker/qos/package-info.java b/microbench/src/main/java/org/apache/pulsar/common/util/qos/package-info.java similarity index 95% rename from microbench/src/main/java/org/apache/pulsar/broker/qos/package-info.java rename to microbench/src/main/java/org/apache/pulsar/common/util/qos/package-info.java index ccea21a210f86..9e8e31216d2b7 100644 --- a/microbench/src/main/java/org/apache/pulsar/broker/qos/package-info.java +++ b/microbench/src/main/java/org/apache/pulsar/common/util/qos/package-info.java @@ -19,4 +19,4 @@ /** * Benchmarks for Pulsar broker Quality of Service (QoS) related classes. */ -package org.apache.pulsar.broker.qos; \ No newline at end of file +package org.apache.pulsar.common.util.qos; \ No newline at end of file diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 156c83bd6960c..8a8bacf0a342d 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2087,6 +2087,11 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece doc = "The threshold to triggering automatic offload to long term storage" ) private long managedLedgerOffloadThresholdInSeconds = -1L; + @FieldContext( + category = CATEGORY_STORAGE_OFFLOADING, + doc = "The number of bytes permitted per second to offload on this broker" + ) + private long managedLedgerOffloadFlowPermitsPerSecond = -1; @FieldContext( category = CATEGORY_STORAGE_ML, doc = "Max number of entries to append to a cursor ledger" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 58d7e71b65d84..f4e2f8bb65065 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -92,8 +92,6 @@ import org.apache.pulsar.broker.lookup.v1.TopicLookup; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.protocol.ProtocolHandlers; -import org.apache.pulsar.broker.qos.DefaultMonotonicSnapshotClock; -import org.apache.pulsar.broker.qos.MonotonicSnapshotClock; import org.apache.pulsar.broker.resourcegroup.ResourceGroupService; import org.apache.pulsar.broker.resourcegroup.ResourceUsageTopicTransportManager; import org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager; @@ -152,6 +150,8 @@ import org.apache.pulsar.common.util.Reflections; import org.apache.pulsar.common.util.ThreadDumpUtil; import org.apache.pulsar.common.util.netty.EventLoopUtil; +import org.apache.pulsar.common.util.qos.DefaultMonotonicSnapshotClock; +import org.apache.pulsar.common.util.qos.MonotonicSnapshotClock; import org.apache.pulsar.compaction.CompactionServiceFactory; import org.apache.pulsar.compaction.Compactor; import org.apache.pulsar.compaction.PulsarCompactionServiceFactory; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java index a733db555a351..332960438cb4b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java @@ -18,12 +18,12 @@ */ package org.apache.pulsar.broker.resourcegroup; -import org.apache.pulsar.broker.qos.MonotonicSnapshotClock; import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount; import org.apache.pulsar.broker.service.PublishRateLimiterImpl; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.policies.data.ResourceGroup; +import org.apache.pulsar.common.util.qos.MonotonicSnapshotClock; public class ResourceGroupPublishLimiter extends PublishRateLimiterImpl { private volatile long publishMaxMessageRate; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index dff6c40054060..89013f1ae2bdc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1956,6 +1956,8 @@ private CompletableFuture getManagedLedgerConfig(@Nonnull T serviceConfig.getManagedLedgerMinimumBacklogEntriesForCaching()); managedLedgerConfig.setMaxBacklogBetweenCursorsForCaching( serviceConfig.getManagedLedgerMaxBacklogBetweenCursorsForCaching()); + managedLedgerConfig.setManagedLedgerOffloadFlowPermitsPerSecond( + serviceConfig.getManagedLedgerOffloadFlowPermitsPerSecond()); OffloadPoliciesImpl nsLevelOffloadPolicies = (OffloadPoliciesImpl) policies.map(p -> p.offload_policies).orElse(null); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java index 8255d9b6931ff..ba182d88896fd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java @@ -25,10 +25,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.pulsar.broker.qos.AsyncTokenBucket; -import org.apache.pulsar.broker.qos.MonotonicSnapshotClock; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublishRate; +import org.apache.pulsar.common.util.qos.AsyncTokenBucket; +import org.apache.pulsar.common.util.qos.MonotonicSnapshotClock; import org.jctools.queues.MessagePassingQueue; import org.jctools.queues.MpscUnboundedArrayQueue; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java index b29cbcd660db1..22f9d29641c7a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java @@ -22,12 +22,12 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.broker.qos.AsyncTokenBucket; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.util.qos.AsyncTokenBucket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java index b1de10e73b76f..459a25198efda 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java @@ -24,11 +24,11 @@ import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import org.apache.pulsar.broker.qos.AsyncTokenBucket; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.SubscribeRate; +import org.apache.pulsar.common.util.qos.AsyncTokenBucket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractTopicTest.java index 39be56e3f41cf..1461ed2d705c9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractTopicTest.java @@ -26,7 +26,7 @@ import static org.testng.Assert.assertEquals; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.broker.qos.AsyncTokenBucket; +import org.apache.pulsar.common.util.qos.AsyncTokenBucket; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterDisableTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterDisableTest.java index ec952a7ca7734..12e1f362990cc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterDisableTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterDisableTest.java @@ -21,7 +21,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; -import org.apache.pulsar.broker.qos.AsyncTokenBucket; +import org.apache.pulsar.common.util.qos.AsyncTokenBucket; import org.testng.annotations.Test; public class PublishRateLimiterDisableTest { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java index 747ef3b7f5ce8..9a18521cc0a73 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java @@ -33,7 +33,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.common.policies.data.DispatchRate; -import org.apache.pulsar.broker.qos.AsyncTokenBucket; +import org.apache.pulsar.common.util.qos.AsyncTokenBucket; import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPublishRateThrottleTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPublishRateThrottleTest.java index 721d049342552..8572b89060e91 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPublishRateThrottleTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPublishRateThrottleTest.java @@ -22,7 +22,7 @@ import java.util.concurrent.TimeoutException; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.common.policies.data.PublishRate; -import org.apache.pulsar.broker.qos.AsyncTokenBucket; +import org.apache.pulsar.common.util.qos.AsyncTokenBucket; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java index 402b5c4972ce2..4c6b06d5d0234 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java @@ -54,7 +54,7 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.protocol.Commands; -import org.apache.pulsar.broker.qos.AsyncTokenBucket; +import org.apache.pulsar.common.util.qos.AsyncTokenBucket; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.compaction.CompactionServiceFactory; import org.awaitility.Awaitility; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java index 360d27f64133d..073606245c58d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java @@ -49,7 +49,7 @@ import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl; -import org.apache.pulsar.broker.qos.AsyncTokenBucket; +import org.apache.pulsar.common.util.qos.AsyncTokenBucket; import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessagePublishThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessagePublishThrottlingTest.java index 1c0ae5547d53b..ba07af58c94cd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessagePublishThrottlingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessagePublishThrottlingTest.java @@ -34,7 +34,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.common.policies.data.PublishRate; -import org.apache.pulsar.broker.qos.AsyncTokenBucket; +import org.apache.pulsar.common.util.qos.AsyncTokenBucket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/AsyncTokenBucket.java similarity index 99% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java rename to pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/AsyncTokenBucket.java index ac9a1f03e592b..a95ea134490b0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/AsyncTokenBucket.java @@ -16,8 +16,7 @@ * specific language governing permissions and limitations * under the License. */ - -package org.apache.pulsar.broker.qos; +package org.apache.pulsar.common.util.qos; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLongFieldUpdater; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBuilder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/AsyncTokenBucketBuilder.java similarity index 97% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBuilder.java rename to pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/AsyncTokenBucketBuilder.java index ee256d5a37d64..4a1135efeb57a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBuilder.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/AsyncTokenBucketBuilder.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.pulsar.broker.qos; +package org.apache.pulsar.common.util.qos; // CHECKSTYLE.OFF: ClassTypeParameterName public abstract class AsyncTokenBucketBuilder> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClock.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/DefaultMonotonicSnapshotClock.java similarity index 98% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClock.java rename to pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/DefaultMonotonicSnapshotClock.java index df3843921ed55..41a037380ad14 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClock.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/DefaultMonotonicSnapshotClock.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.pulsar.broker.qos; +package org.apache.pulsar.common.util.qos; import java.util.concurrent.TimeUnit; import java.util.function.LongSupplier; @@ -51,7 +51,6 @@ public DefaultMonotonicSnapshotClock(long snapshotIntervalNanos, LongSupplier cl thread.start(); } - /** {@inheritDoc} */ @Override public long getTickNanos(boolean requestSnapshot) { if (requestSnapshot) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucket.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/DynamicRateAsyncTokenBucket.java similarity index 98% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucket.java rename to pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/DynamicRateAsyncTokenBucket.java index 8edc73d1f51e3..b04875889a4ca 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucket.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/DynamicRateAsyncTokenBucket.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.pulsar.broker.qos; +package org.apache.pulsar.common.util.qos; import java.util.function.LongSupplier; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucketBuilder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/DynamicRateAsyncTokenBucketBuilder.java similarity index 98% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucketBuilder.java rename to pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/DynamicRateAsyncTokenBucketBuilder.java index 22270484c72f0..68c5455fcdede 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucketBuilder.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/DynamicRateAsyncTokenBucketBuilder.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.pulsar.broker.qos; +package org.apache.pulsar.common.util.qos; import java.util.function.LongSupplier; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucket.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/FinalRateAsyncTokenBucket.java similarity index 98% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucket.java rename to pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/FinalRateAsyncTokenBucket.java index 627c5ee1334b2..f222a50abeaa7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucket.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/FinalRateAsyncTokenBucket.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.pulsar.broker.qos; +package org.apache.pulsar.common.util.qos; /** * A subclass of {@link AsyncTokenBucket} that represents a token bucket with a rate which is final. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucketBuilder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/FinalRateAsyncTokenBucketBuilder.java similarity index 98% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucketBuilder.java rename to pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/FinalRateAsyncTokenBucketBuilder.java index ff4ed53c6c7fa..b8c91ad708786 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucketBuilder.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/FinalRateAsyncTokenBucketBuilder.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.pulsar.broker.qos; +package org.apache.pulsar.common.util.qos; /** * A builder class for creating instances of {@link FinalRateAsyncTokenBucket}. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/MonotonicSnapshotClock.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/MonotonicSnapshotClock.java similarity index 98% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/MonotonicSnapshotClock.java rename to pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/MonotonicSnapshotClock.java index 8f61bd5125b5f..c326b7b9a1290 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/MonotonicSnapshotClock.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/MonotonicSnapshotClock.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.pulsar.broker.qos; +package org.apache.pulsar.common.util.qos; /** * An interface representing a clock that provides a monotonic counter in nanoseconds. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/package-info.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/package-info.java similarity index 95% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/package-info.java rename to pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/package-info.java index 1078d86894efe..9c4ab0353dcb5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/package-info.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/package-info.java @@ -19,4 +19,4 @@ /** * Pulsar broker Quality of Service (QoS) related classes. */ -package org.apache.pulsar.broker.qos; \ No newline at end of file +package org.apache.pulsar.common.util.qos; \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/AsyncTokenBucketTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/qos/AsyncTokenBucketTest.java similarity index 98% rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/AsyncTokenBucketTest.java rename to pulsar-common/src/test/java/org/apache/pulsar/common/util/qos/AsyncTokenBucketTest.java index b446f9e902f2a..d3c53aad50255 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/AsyncTokenBucketTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/qos/AsyncTokenBucketTest.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.pulsar.broker.qos; +package org.apache.pulsar.common.util.qos; import static org.testng.Assert.assertEquals; import java.util.concurrent.TimeUnit;