From 6bf49d157b3887217055d11d12dc2a6e90ce938a Mon Sep 17 00:00:00 2001 From: dao-jun Date: Fri, 29 Mar 2024 20:31:13 +0800 Subject: [PATCH 1/7] Introduce Offload threshold --- .../mledger/ManagedLedgerConfig.java | 19 ++ .../bookkeeper/mledger/OffloadReadHandle.java | 174 ++++++++++++++++++ .../mledger/impl/ManagedLedgerImpl.java | 11 +- .../mledger/OffloadReadHandleTest.java | 102 ++++++++++ .../pulsar/broker/ServiceConfiguration.java | 5 + .../pulsar/broker/service/BrokerService.java | 2 + .../PrometheusMetricsGenerator.java | 4 +- .../broker/stats/PrometheusMetricsTest.java | 1 + .../pulsar/common/util}/TimeWindow.java | 72 ++++++-- .../pulsar/common/util}/WindowWrap.java | 2 +- .../pulsar/common/util}/TimeWindowTest.java | 2 +- 11 files changed, 366 insertions(+), 28 deletions(-) create mode 100644 managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java create mode 100644 managed-ledger/src/test/java/org/apache/bookkeeper/mledger/OffloadReadHandleTest.java rename {pulsar-broker/src/main/java/org/apache/pulsar/broker/stats => pulsar-common/src/main/java/org/apache/pulsar/common/util}/TimeWindow.java (53%) rename {pulsar-broker/src/main/java/org/apache/pulsar/broker/stats => pulsar-common/src/main/java/org/apache/pulsar/common/util}/WindowWrap.java (97%) rename {pulsar-broker/src/test/java/org/apache/pulsar/broker/stats => pulsar-common/src/test/java/org/apache/pulsar/common/util}/TimeWindowTest.java (98%) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index 0c93a5b642cf6..8c45b79bcbf5a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -85,6 +85,7 @@ public class ManagedLedgerConfig { private int minimumBacklogCursorsForCaching = 0; private int minimumBacklogEntriesForCaching = 1000; private int maxBacklogBetweenCursorsForCaching = 1000; + private long managedLedgerOffloadFlowPermitsPerSecond = -1; @Getter @Setter @@ -752,5 +753,23 @@ public String getShadowSource() { return MapUtils.getString(properties, PROPERTY_SOURCE_TOPIC_KEY); } + /** + * Set permitted size to offload on the broker. + * + * @param managedLedgerOffloadBrokerFlowPermit + */ + public void setManagedLedgerOffloadFlowPermitsPerSecond(long managedLedgerOffloadBrokerFlowPermit) { + this.managedLedgerOffloadFlowPermitsPerSecond = managedLedgerOffloadBrokerFlowPermit; + } + + /** + * Get permitted size to offload on the broker. + * + * @return + */ + public long getManagedLedgerOffloadFlowPermitsPerSecond() { + return managedLedgerOffloadFlowPermitsPerSecond; + } + public static final String PROPERTY_SOURCE_TOPIC_KEY = "PULSAR.SHADOW_SOURCE"; } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java new file mode 100644 index 0000000000000..bc6f6a6a9bd78 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerMetadata; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.mledger.proto.MLDataFormats; +import org.apache.pulsar.common.util.TimeWindow; +import org.apache.pulsar.common.util.WindowWrap; + +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * OffloadReadHandle is a wrapper of ReadHandle to offload read operations. + */ +public final class OffloadReadHandle implements ReadHandle { + private static final AtomicBoolean INITIALIZED = new AtomicBoolean(false); + private static volatile long flowPermits = -1L; + private static volatile TimeWindow window; + + private final ReadHandle delegate; + private final long averageEntrySize; + + private OffloadReadHandle(ReadHandle handle, ManagedLedgerConfig config, + MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo) { + initialize(config); + this.delegate = Objects.requireNonNull(handle); + Objects.requireNonNull(ledgerInfo); + long averageEntrySize = ledgerInfo.getSize() / ledgerInfo.getEntries(); + if (averageEntrySize <= 0) { + averageEntrySize = 1; + } + this.averageEntrySize = averageEntrySize; + } + + private static void initialize(ManagedLedgerConfig config) { + if (INITIALIZED.compareAndSet(false, true)) { + flowPermits = config.getManagedLedgerOffloadFlowPermitsPerSecond(); + window = new TimeWindow<>(2, 1000); + } + } + + public static CompletableFuture create(ReadHandle handle, ManagedLedgerConfig config, + MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo) { + return CompletableFuture.completedFuture(new OffloadReadHandle(handle, config, ledgerInfo)); + } + + @Override + public CompletableFuture readAsync(long firstEntry, long lastEntry) { + long numEntries = lastEntry - firstEntry + 1; + long numBytes = numEntries * averageEntrySize; + + long delayMillis; + // block the offloader thread if the flow control permits is exceeded. + while ((delayMillis = calculateDelayMillis(numBytes)) > 0) { + try { + Thread.sleep(delayMillis); + } catch (InterruptedException ex) { + return CompletableFuture.failedFuture(ex); + } + } + + return delegate.readAsync(firstEntry, lastEntry); + } + + @Override + public CompletableFuture readUnconfirmedAsync(long firstEntry, long lastEntry) { + return this.delegate.readUnconfirmedAsync(firstEntry, lastEntry); + } + + @Override + public CompletableFuture readLastAddConfirmedAsync() { + return this.delegate.readLastAddConfirmedAsync(); + } + + @Override + public CompletableFuture tryReadLastAddConfirmedAsync() { + return this.delegate.tryReadLastAddConfirmedAsync(); + } + + @Override + public long getLastAddConfirmed() { + return this.delegate.getLastAddConfirmed(); + } + + @Override + public long getLength() { + return this.delegate.getLength(); + } + + @Override + public boolean isClosed() { + return this.delegate.isClosed(); + } + + @Override + public CompletableFuture readLastAddConfirmedAndEntryAsync( + long entryId, long timeOutInMillis, boolean parallel) { + return this.delegate.readLastAddConfirmedAndEntryAsync(entryId, timeOutInMillis, parallel); + } + + @Override + public long getId() { + return this.delegate.getId(); + } + + @Override + public CompletableFuture closeAsync() { + return this.delegate.closeAsync(); + } + + @Override + public LedgerMetadata getLedgerMetadata() { + return this.delegate.getLedgerMetadata(); + } + + + private static synchronized long calculateDelayMillis(long numBytes) { + if (flowPermits <= 0) { + return 0; + } + if (numBytes <= 0) { + return 0; + } + + WindowWrap wrap = window.current(__ -> new AtomicLong(0)); + if (wrap == null) { + // it should never goes here + return 0; + } + AtomicLong counter = wrap.value(); + + long delayMillis = 0; + // Cannot use `counter.addAndGet(bytes) >= flowPermits` directly. + // If flowPermits is less than the bytes of a single entry or a read request, + // the read request will be blocked forever. + if (counter.get() >= flowPermits) { + // park until next window start + long end = wrap.start() + wrap.interval(); + delayMillis = end - System.currentTimeMillis(); + } + counter.addAndGet(numBytes); + return delayMillis; + } + + @VisibleForTesting + public void reset() { + INITIALIZED.set(false); + flowPermits = -1L; + window = null; + } +} diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 0f089ef4a8573..88b487509a825 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -80,7 +80,7 @@ import org.apache.bookkeeper.common.util.Backoff; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.common.util.Retries; -import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.*; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback; @@ -92,11 +92,6 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.UpdatePropertiesCallback; -import org.apache.bookkeeper.mledger.Entry; -import org.apache.bookkeeper.mledger.ManagedCursor; -import org.apache.bookkeeper.mledger.ManagedLedger; -import org.apache.bookkeeper.mledger.ManagedLedgerConfig; -import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.BadVersionException; import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorNotFoundException; import org.apache.bookkeeper.mledger.ManagedLedgerException.InvalidCursorPositionException; @@ -110,9 +105,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.MetadataNotFoundException; import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException; -import org.apache.bookkeeper.mledger.ManagedLedgerMXBean; -import org.apache.bookkeeper.mledger.Position; -import org.apache.bookkeeper.mledger.WaitingEntryCallBack; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback; import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback; import org.apache.bookkeeper.mledger.impl.cache.EntryCache; @@ -3245,6 +3237,7 @@ void offloadLoop(CompletableFuture promise, Queue ledg prepareLedgerInfoForOffloaded(ledgerId, uuid, driverName, driverMetadata) .thenCompose((ignore) -> getLedgerHandle(ledgerId)) + .thenCompose(h -> OffloadReadHandle.create(h, config, info)) .thenCompose(readHandle -> config.getLedgerOffloader().offload(readHandle, uuid, extraMetadata)) .thenCompose((ignore) -> { return Retries.run(Backoff.exponentialJittered(TimeUnit.SECONDS.toMillis(1), diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/OffloadReadHandleTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/OffloadReadHandleTest.java new file mode 100644 index 0000000000000..fe2cbd6a3d1b6 --- /dev/null +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/OffloadReadHandleTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger; + +import com.google.common.collect.Lists; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.client.impl.LedgerEntriesImpl; +import org.apache.bookkeeper.client.impl.LedgerEntryImpl; +import org.apache.bookkeeper.mledger.proto.MLDataFormats; +import org.apache.bookkeeper.test.MockedBookKeeperTestCase; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +public class OffloadReadHandleTest extends MockedBookKeeperTestCase { + + @DataProvider(name = "flowPermits") + public Object[][] permits() { + return new Object[][]{ + {-1L}, + {0L}, + {50}, + {100L}, + {10000L} + }; + } + + @Test(dataProvider = "flowPermits") + public void testFlowPermits(long flowPermits) throws Exception { + ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(100); + for (int a = 0; a < 100; a++) { + buf.writeByte(0); + } + LedgerEntry entry = LedgerEntryImpl.create(1, 1, buf.readableBytes(), buf); + List entryList = Lists.newArrayList(entry); + OffloadReadHandle h = null; + try (LedgerEntries entries = LedgerEntriesImpl.create(entryList)) { + ReadHandle handle = Mockito.mock(ReadHandle.class); + Mockito.doAnswer(inv -> CompletableFuture.completedFuture(entries)).when(handle) + .readAsync(Mockito.anyLong(), Mockito.anyLong()); + + long start = System.currentTimeMillis(); + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setManagedLedgerOffloadFlowPermitsPerSecond(flowPermits); + + CompletableFuture future = OffloadReadHandle.create(handle, config, + MLDataFormats.ManagedLedgerInfo.LedgerInfo.newBuilder().setLedgerId(1) + .setEntries(1).setSize(100).build()); + h = (OffloadReadHandle) future.get(); + + h.read(1, 1); + h.read(1, 1); + h.read(1, 1); + h.read(1, 1); + h.read(1, 1); + h.read(1, 1); + h.read(1, 1); + h.read(1, 1); + h.read(1, 1); + h.read(1, 1); + + long actualDuration = System.currentTimeMillis() - start; + if (flowPermits <= 0L) { + Assert.assertEquals(actualDuration, 1000D, 1000D); + } else if (flowPermits == 100L || flowPermits == 50L) { + long expectDuration = TimeUnit.SECONDS.toMillis(8); + Assert.assertTrue(actualDuration >= expectDuration); + Assert.assertEquals(actualDuration, expectDuration, expectDuration * 0.2D); + } else if (flowPermits == 10000L) { + Assert.assertEquals(actualDuration, 1000D, 1000D); + } + } finally { + if (null != h) { + h.reset(); + } + } + } +} \ No newline at end of file diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index e088f50a05c88..f8907bda7f3a5 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2057,6 +2057,11 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece doc = "The threshold to triggering automatic offload to long term storage" ) private long managedLedgerOffloadThresholdInSeconds = -1L; + @FieldContext( + category = CATEGORY_STORAGE_OFFLOADING, + doc = "The number of bytes permitted per second to offload on this broker" + ) + private long managedLedgerOffloadFlowPermitsPerSecond = -1; @FieldContext( category = CATEGORY_STORAGE_ML, doc = "Max number of entries to append to a cursor ledger" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 98a0ed95b1a45..ef365b57e5590 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1937,6 +1937,8 @@ private CompletableFuture getManagedLedgerConfig(@Nonnull T serviceConfig.getManagedLedgerMinimumBacklogEntriesForCaching()); managedLedgerConfig.setMaxBacklogBetweenCursorsForCaching( serviceConfig.getManagedLedgerMaxBacklogBetweenCursorsForCaching()); + managedLedgerConfig.setManagedLedgerOffloadFlowPermitsPerSecond( + serviceConfig.getManagedLedgerOffloadFlowPermitsPerSecond()); OffloadPoliciesImpl nsLevelOffloadPolicies = (OffloadPoliciesImpl) policies.map(p -> p.offload_policies).orElse(null); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java index 124f0d3e54e4f..d99c91d2f3dd1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java @@ -44,8 +44,8 @@ import org.apache.bookkeeper.stats.StatsProvider; import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.broker.PulsarService; -import org.apache.pulsar.broker.stats.TimeWindow; -import org.apache.pulsar.broker.stats.WindowWrap; +import org.apache.pulsar.common.util.TimeWindow; +import org.apache.pulsar.common.util.WindowWrap; import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics; import org.apache.pulsar.broker.stats.metrics.ManagedLedgerCacheMetrics; import org.apache.pulsar.broker.stats.metrics.ManagedLedgerMetrics; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index d3891931496c5..4edcb894c5854 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -81,6 +81,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.util.TimeWindow; import org.apache.pulsar.compaction.Compactor; import org.apache.pulsar.compaction.PulsarCompactionServiceFactory; import org.apache.zookeeper.CreateMode; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/TimeWindow.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/TimeWindow.java similarity index 53% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/TimeWindow.java rename to pulsar-common/src/main/java/org/apache/pulsar/common/util/TimeWindow.java index 08730189322ee..aaa1865a3971b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/TimeWindow.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/TimeWindow.java @@ -16,9 +16,11 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.stats; +package org.apache.pulsar.common.util; import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; public final class TimeWindow { @@ -26,51 +28,91 @@ public final class TimeWindow { private final int sampleCount; private final AtomicReferenceArray> array; + private final Lock updateLock = new ReentrantLock(); + public TimeWindow(int sampleCount, int interval) { this.sampleCount = sampleCount; this.interval = interval; this.array = new AtomicReferenceArray<>(sampleCount); } + + public WindowWrap current(Function function) { + return current(function, System.currentTimeMillis()); + } + + /** * return current time window data. * * @param function generate data. * @return */ - public synchronized WindowWrap current(Function function) { - long millis = System.currentTimeMillis(); - - if (millis < 0) { + public WindowWrap current(Function function, long timeMillis) { + if (timeMillis < 0) { return null; } - int idx = calculateTimeIdx(millis); - long windowStart = calculateWindowStart(millis); + + int idx = calculateTimeIdx(timeMillis); + // Calculate current bucket start time. + long windowStart = calculateWindowStart(timeMillis); while (true) { WindowWrap old = array.get(idx); if (old == null) { - WindowWrap window = new WindowWrap<>(interval, windowStart, null); + WindowWrap window = new WindowWrap<>(interval, windowStart, function.apply(null)); if (array.compareAndSet(idx, null, window)) { - T value = null == function ? null : function.apply(null); - window.value(value); return window; } else { + // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } } else if (windowStart == old.start()) { return old; } else if (windowStart > old.start()) { - T value = null == function ? null : function.apply(old.value()); - old.value(value); - old.resetWindowStart(windowStart); - return old; + if (updateLock.tryLock()) { + try { + // Successfully get the update lock, now we reset the bucket. + T value = null == function ? null : function.apply(old.value()); + old.value(value); + old.resetWindowStart(windowStart); + return old; + } finally { + updateLock.unlock(); + } + } else { + // Contention failed, the thread will yield its time slice to wait for bucket available. + Thread.yield(); + } } else { - //it should never goes here + //when windowStart < old.value() + // Should not go through here, as the provided time is already behind. throw new IllegalStateException(); } } } + /** + * return next time window data. + * + * @param function generate data. + */ + public WindowWrap next(Function function, long timeMillis) { + if (this.sampleCount <= 1) { + throw new IllegalStateException("Argument sampleCount cannot less than 2"); + } + return this.current(function, timeMillis + interval); + } + + + /** + * return next time window data. + * + * @param function generate data. + */ + public WindowWrap next(Function function) { + return this.next(function, System.currentTimeMillis()); + } + private int calculateTimeIdx(long timeMillis) { long timeId = timeMillis / this.interval; return (int) (timeId % sampleCount); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/WindowWrap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/WindowWrap.java similarity index 97% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/WindowWrap.java rename to pulsar-common/src/main/java/org/apache/pulsar/common/util/WindowWrap.java index 12869b82921e5..29d3a871132b4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/WindowWrap.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/WindowWrap.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.stats; +package org.apache.pulsar.common.util; public final class WindowWrap { private final long interval; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TimeWindowTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/TimeWindowTest.java similarity index 98% rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TimeWindowTest.java rename to pulsar-common/src/test/java/org/apache/pulsar/common/util/TimeWindowTest.java index 89528c1965397..6df12edd73fb1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TimeWindowTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/TimeWindowTest.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.stats; +package org.apache.pulsar.common.util; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; From 5b83d245547bc645ac1b799fb8578c2e72cb8031 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Fri, 29 Mar 2024 20:46:24 +0800 Subject: [PATCH 2/7] fix codestyle --- .../apache/bookkeeper/mledger/OffloadReadHandle.java | 9 ++++----- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 11 ++++++++++- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java index bc6f6a6a9bd78..e71aa821c5c55 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java @@ -19,6 +19,10 @@ package org.apache.bookkeeper.mledger; import com.google.common.annotations.VisibleForTesting; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerMetadata; @@ -27,11 +31,6 @@ import org.apache.pulsar.common.util.TimeWindow; import org.apache.pulsar.common.util.WindowWrap; -import java.util.Objects; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - /** * OffloadReadHandle is a wrapper of ReadHandle to offload read operations. */ diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 88b487509a825..032850514fcfc 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -80,7 +80,7 @@ import org.apache.bookkeeper.common.util.Backoff; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.common.util.Retries; -import org.apache.bookkeeper.mledger.*; +import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback; @@ -92,6 +92,11 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.UpdatePropertiesCallback; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.BadVersionException; import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorNotFoundException; import org.apache.bookkeeper.mledger.ManagedLedgerException.InvalidCursorPositionException; @@ -105,6 +110,10 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.MetadataNotFoundException; import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException; +import org.apache.bookkeeper.mledger.ManagedLedgerMXBean; +import org.apache.bookkeeper.mledger.OffloadReadHandle; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.WaitingEntryCallBack; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback; import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback; import org.apache.bookkeeper.mledger.impl.cache.EntryCache; From 5a4be32eee683892ed8df5f396dc3a93374d3ebf Mon Sep 17 00:00:00 2001 From: dao-jun Date: Fri, 29 Mar 2024 22:51:43 +0800 Subject: [PATCH 3/7] Address comments --- .../bookkeeper/mledger/OffloadReadHandle.java | 47 +++++++++---------- .../mledger/OffloadReadHandleTest.java | 2 - .../broker/qos/AsyncTokenBucketBenchmark.java | 3 ++ .../apache/pulsar/broker/PulsarService.java | 4 +- .../ResourceGroupPublishLimiter.java | 2 +- .../service/PublishRateLimiterImpl.java | 4 +- .../persistent/DispatchRateLimiter.java | 2 +- .../persistent/SubscribeRateLimiter.java | 2 +- .../pulsar/broker/stats}/TimeWindow.java | 2 +- .../pulsar/broker/stats}/WindowWrap.java | 2 +- .../PrometheusMetricsGenerator.java | 4 +- .../broker/service/AbstractTopicTest.java | 2 +- .../PublishRateLimiterDisableTest.java | 2 +- .../service/ReplicatorRateLimiterTest.java | 2 +- .../service/TopicPublishRateThrottleTest.java | 2 +- .../persistent/MessageDuplicationTest.java | 2 +- .../broker/stats/PrometheusMetricsTest.java | 1 - .../pulsar/broker/stats}/TimeWindowTest.java | 5 +- .../api/MessageDispatchThrottlingTest.java | 2 +- .../impl/MessagePublishThrottlingTest.java | 2 +- .../common/util}/qos/AsyncTokenBucket.java | 3 +- .../util}/qos/AsyncTokenBucketBuilder.java | 2 +- .../qos/DefaultMonotonicSnapshotClock.java | 3 +- .../qos/DynamicRateAsyncTokenBucket.java | 2 +- .../DynamicRateAsyncTokenBucketBuilder.java | 2 +- .../util}/qos/FinalRateAsyncTokenBucket.java | 2 +- .../qos/FinalRateAsyncTokenBucketBuilder.java | 2 +- .../util}/qos/MonotonicSnapshotClock.java | 2 +- .../pulsar/common/util}/qos/package-info.java | 2 +- .../util}/qos/AsyncTokenBucketTest.java | 2 +- 30 files changed, 56 insertions(+), 60 deletions(-) rename {pulsar-common/src/main/java/org/apache/pulsar/common/util => pulsar-broker/src/main/java/org/apache/pulsar/broker/stats}/TimeWindow.java (99%) rename {pulsar-common/src/main/java/org/apache/pulsar/common/util => pulsar-broker/src/main/java/org/apache/pulsar/broker/stats}/WindowWrap.java (97%) rename {pulsar-common/src/test/java/org/apache/pulsar/common/util => pulsar-broker/src/test/java/org/apache/pulsar/broker/stats}/TimeWindowTest.java (95%) rename {pulsar-broker/src/main/java/org/apache/pulsar/broker => pulsar-common/src/main/java/org/apache/pulsar/common/util}/qos/AsyncTokenBucket.java (99%) rename {pulsar-broker/src/main/java/org/apache/pulsar/broker => pulsar-common/src/main/java/org/apache/pulsar/common/util}/qos/AsyncTokenBucketBuilder.java (97%) rename {pulsar-broker/src/main/java/org/apache/pulsar/broker => pulsar-common/src/main/java/org/apache/pulsar/common/util}/qos/DefaultMonotonicSnapshotClock.java (98%) rename {pulsar-broker/src/main/java/org/apache/pulsar/broker => pulsar-common/src/main/java/org/apache/pulsar/common/util}/qos/DynamicRateAsyncTokenBucket.java (98%) rename {pulsar-broker/src/main/java/org/apache/pulsar/broker => pulsar-common/src/main/java/org/apache/pulsar/common/util}/qos/DynamicRateAsyncTokenBucketBuilder.java (98%) rename {pulsar-broker/src/main/java/org/apache/pulsar/broker => pulsar-common/src/main/java/org/apache/pulsar/common/util}/qos/FinalRateAsyncTokenBucket.java (98%) rename {pulsar-broker/src/main/java/org/apache/pulsar/broker => pulsar-common/src/main/java/org/apache/pulsar/common/util}/qos/FinalRateAsyncTokenBucketBuilder.java (98%) rename {pulsar-broker/src/main/java/org/apache/pulsar/broker => pulsar-common/src/main/java/org/apache/pulsar/common/util}/qos/MonotonicSnapshotClock.java (98%) rename {pulsar-broker/src/main/java/org/apache/pulsar/broker => pulsar-common/src/main/java/org/apache/pulsar/common/util}/qos/package-info.java (95%) rename {pulsar-broker/src/test/java/org/apache/pulsar/broker => pulsar-common/src/test/java/org/apache/pulsar/common/util}/qos/AsyncTokenBucketTest.java (98%) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java index e71aa821c5c55..3b9b4af13f455 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java @@ -19,17 +19,17 @@ package org.apache.bookkeeper.mledger; import com.google.common.annotations.VisibleForTesting; -import java.util.Objects; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.mledger.proto.MLDataFormats; -import org.apache.pulsar.common.util.TimeWindow; -import org.apache.pulsar.common.util.WindowWrap; +import org.apache.pulsar.common.util.qos.AsyncTokenBucket; + +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** * OffloadReadHandle is a wrapper of ReadHandle to offload read operations. @@ -37,7 +37,7 @@ public final class OffloadReadHandle implements ReadHandle { private static final AtomicBoolean INITIALIZED = new AtomicBoolean(false); private static volatile long flowPermits = -1L; - private static volatile TimeWindow window; + private static volatile AsyncTokenBucket tokenBucket; private final ReadHandle delegate; private final long averageEntrySize; @@ -57,7 +57,10 @@ private OffloadReadHandle(ReadHandle handle, ManagedLedgerConfig config, private static void initialize(ManagedLedgerConfig config) { if (INITIALIZED.compareAndSet(false, true)) { flowPermits = config.getManagedLedgerOffloadFlowPermitsPerSecond(); - window = new TimeWindow<>(2, 1000); + if (flowPermits > 0) { + tokenBucket = AsyncTokenBucket.builder().initialTokens(0).capacity(2 * flowPermits) + .rate(flowPermits).build(); + } } } @@ -144,30 +147,22 @@ private static synchronized long calculateDelayMillis(long numBytes) { return 0; } - WindowWrap wrap = window.current(__ -> new AtomicLong(0)); - if (wrap == null) { - // it should never goes here - return 0; - } - AtomicLong counter = wrap.value(); - - long delayMillis = 0; - // Cannot use `counter.addAndGet(bytes) >= flowPermits` directly. - // If flowPermits is less than the bytes of a single entry or a read request, - // the read request will be blocked forever. - if (counter.get() >= flowPermits) { - // park until next window start - long end = wrap.start() + wrap.interval(); - delayMillis = end - System.currentTimeMillis(); + if (tokenBucket.containsTokens(true)) { + long token = tokenBucket.getTokens(); + if (token > 0) { + // To prevent flowPermits is less than each batch size. + tokenBucket.consumeTokens(numBytes); + return 0; + } } - counter.addAndGet(numBytes); - return delayMillis; + + return TimeUnit.NANOSECONDS.toMillis(tokenBucket.calculateThrottlingDuration()); } @VisibleForTesting public void reset() { INITIALIZED.set(false); flowPermits = -1L; - window = null; + tokenBucket = null; } } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/OffloadReadHandleTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/OffloadReadHandleTest.java index fe2cbd6a3d1b6..31323baab729e 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/OffloadReadHandleTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/OffloadReadHandleTest.java @@ -43,7 +43,6 @@ public Object[][] permits() { return new Object[][]{ {-1L}, {0L}, - {50}, {100L}, {10000L} }; @@ -88,7 +87,6 @@ public void testFlowPermits(long flowPermits) throws Exception { Assert.assertEquals(actualDuration, 1000D, 1000D); } else if (flowPermits == 100L || flowPermits == 50L) { long expectDuration = TimeUnit.SECONDS.toMillis(8); - Assert.assertTrue(actualDuration >= expectDuration); Assert.assertEquals(actualDuration, expectDuration, expectDuration * 0.2D); } else if (flowPermits == 10000L) { Assert.assertEquals(actualDuration, 1000D, 1000D); diff --git a/microbench/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBenchmark.java b/microbench/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBenchmark.java index 4c069e72ea3ba..d17c74819c448 100644 --- a/microbench/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBenchmark.java +++ b/microbench/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBenchmark.java @@ -20,6 +20,9 @@ package org.apache.pulsar.broker.qos; import java.util.concurrent.TimeUnit; + +import org.apache.pulsar.common.util.qos.AsyncTokenBucket; +import org.apache.pulsar.common.util.qos.DefaultMonotonicSnapshotClock; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index c1137bcfc25b7..38c1d1f07f382 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -92,8 +92,6 @@ import org.apache.pulsar.broker.lookup.v1.TopicLookup; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.protocol.ProtocolHandlers; -import org.apache.pulsar.broker.qos.DefaultMonotonicSnapshotClock; -import org.apache.pulsar.broker.qos.MonotonicSnapshotClock; import org.apache.pulsar.broker.resourcegroup.ResourceGroupService; import org.apache.pulsar.broker.resourcegroup.ResourceUsageTopicTransportManager; import org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager; @@ -150,6 +148,8 @@ import org.apache.pulsar.common.util.Reflections; import org.apache.pulsar.common.util.ThreadDumpUtil; import org.apache.pulsar.common.util.netty.EventLoopUtil; +import org.apache.pulsar.common.util.qos.DefaultMonotonicSnapshotClock; +import org.apache.pulsar.common.util.qos.MonotonicSnapshotClock; import org.apache.pulsar.compaction.CompactionServiceFactory; import org.apache.pulsar.compaction.Compactor; import org.apache.pulsar.compaction.PulsarCompactionServiceFactory; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java index a733db555a351..332960438cb4b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java @@ -18,12 +18,12 @@ */ package org.apache.pulsar.broker.resourcegroup; -import org.apache.pulsar.broker.qos.MonotonicSnapshotClock; import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount; import org.apache.pulsar.broker.service.PublishRateLimiterImpl; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.policies.data.ResourceGroup; +import org.apache.pulsar.common.util.qos.MonotonicSnapshotClock; public class ResourceGroupPublishLimiter extends PublishRateLimiterImpl { private volatile long publishMaxMessageRate; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java index 8255d9b6931ff..ba182d88896fd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java @@ -25,10 +25,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.pulsar.broker.qos.AsyncTokenBucket; -import org.apache.pulsar.broker.qos.MonotonicSnapshotClock; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublishRate; +import org.apache.pulsar.common.util.qos.AsyncTokenBucket; +import org.apache.pulsar.common.util.qos.MonotonicSnapshotClock; import org.jctools.queues.MessagePassingQueue; import org.jctools.queues.MpscUnboundedArrayQueue; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java index b29cbcd660db1..22f9d29641c7a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java @@ -22,12 +22,12 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.broker.qos.AsyncTokenBucket; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.util.qos.AsyncTokenBucket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java index b1de10e73b76f..459a25198efda 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java @@ -24,11 +24,11 @@ import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import org.apache.pulsar.broker.qos.AsyncTokenBucket; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.SubscribeRate; +import org.apache.pulsar.common.util.qos.AsyncTokenBucket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/TimeWindow.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/TimeWindow.java similarity index 99% rename from pulsar-common/src/main/java/org/apache/pulsar/common/util/TimeWindow.java rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/TimeWindow.java index aaa1865a3971b..a7099c7fb5637 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/TimeWindow.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/TimeWindow.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.common.util; +package org.apache.pulsar.broker.stats; import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.concurrent.locks.Lock; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/WindowWrap.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/WindowWrap.java similarity index 97% rename from pulsar-common/src/main/java/org/apache/pulsar/common/util/WindowWrap.java rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/WindowWrap.java index 29d3a871132b4..12869b82921e5 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/WindowWrap.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/WindowWrap.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.common.util; +package org.apache.pulsar.broker.stats; public final class WindowWrap { private final long interval; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java index d99c91d2f3dd1..124f0d3e54e4f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java @@ -44,8 +44,8 @@ import org.apache.bookkeeper.stats.StatsProvider; import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.broker.PulsarService; -import org.apache.pulsar.common.util.TimeWindow; -import org.apache.pulsar.common.util.WindowWrap; +import org.apache.pulsar.broker.stats.TimeWindow; +import org.apache.pulsar.broker.stats.WindowWrap; import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics; import org.apache.pulsar.broker.stats.metrics.ManagedLedgerCacheMetrics; import org.apache.pulsar.broker.stats.metrics.ManagedLedgerMetrics; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractTopicTest.java index 39be56e3f41cf..1461ed2d705c9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractTopicTest.java @@ -26,7 +26,7 @@ import static org.testng.Assert.assertEquals; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.broker.qos.AsyncTokenBucket; +import org.apache.pulsar.common.util.qos.AsyncTokenBucket; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterDisableTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterDisableTest.java index ec952a7ca7734..12e1f362990cc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterDisableTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterDisableTest.java @@ -21,7 +21,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; -import org.apache.pulsar.broker.qos.AsyncTokenBucket; +import org.apache.pulsar.common.util.qos.AsyncTokenBucket; import org.testng.annotations.Test; public class PublishRateLimiterDisableTest { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java index 747ef3b7f5ce8..9a18521cc0a73 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java @@ -33,7 +33,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.common.policies.data.DispatchRate; -import org.apache.pulsar.broker.qos.AsyncTokenBucket; +import org.apache.pulsar.common.util.qos.AsyncTokenBucket; import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPublishRateThrottleTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPublishRateThrottleTest.java index 721d049342552..8572b89060e91 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPublishRateThrottleTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPublishRateThrottleTest.java @@ -22,7 +22,7 @@ import java.util.concurrent.TimeoutException; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.common.policies.data.PublishRate; -import org.apache.pulsar.broker.qos.AsyncTokenBucket; +import org.apache.pulsar.common.util.qos.AsyncTokenBucket; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java index 402b5c4972ce2..4c6b06d5d0234 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java @@ -54,7 +54,7 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.protocol.Commands; -import org.apache.pulsar.broker.qos.AsyncTokenBucket; +import org.apache.pulsar.common.util.qos.AsyncTokenBucket; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.compaction.CompactionServiceFactory; import org.awaitility.Awaitility; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index 4edcb894c5854..d3891931496c5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -81,7 +81,6 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.common.util.TimeWindow; import org.apache.pulsar.compaction.Compactor; import org.apache.pulsar.compaction.PulsarCompactionServiceFactory; import org.apache.zookeeper.CreateMode; diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/TimeWindowTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TimeWindowTest.java similarity index 95% rename from pulsar-common/src/test/java/org/apache/pulsar/common/util/TimeWindowTest.java rename to pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TimeWindowTest.java index 6df12edd73fb1..4802d5cdb5a9f 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/TimeWindowTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TimeWindowTest.java @@ -16,10 +16,13 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.common.util; +package org.apache.pulsar.broker.stats; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; + +import org.apache.pulsar.broker.stats.TimeWindow; +import org.apache.pulsar.broker.stats.WindowWrap; import org.testng.annotations.Test; public class TimeWindowTest { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java index 360d27f64133d..073606245c58d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java @@ -49,7 +49,7 @@ import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl; -import org.apache.pulsar.broker.qos.AsyncTokenBucket; +import org.apache.pulsar.common.util.qos.AsyncTokenBucket; import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessagePublishThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessagePublishThrottlingTest.java index 1c0ae5547d53b..ba07af58c94cd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessagePublishThrottlingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessagePublishThrottlingTest.java @@ -34,7 +34,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.common.policies.data.PublishRate; -import org.apache.pulsar.broker.qos.AsyncTokenBucket; +import org.apache.pulsar.common.util.qos.AsyncTokenBucket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/AsyncTokenBucket.java similarity index 99% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java rename to pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/AsyncTokenBucket.java index ac9a1f03e592b..a95ea134490b0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/AsyncTokenBucket.java @@ -16,8 +16,7 @@ * specific language governing permissions and limitations * under the License. */ - -package org.apache.pulsar.broker.qos; +package org.apache.pulsar.common.util.qos; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLongFieldUpdater; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBuilder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/AsyncTokenBucketBuilder.java similarity index 97% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBuilder.java rename to pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/AsyncTokenBucketBuilder.java index ee256d5a37d64..4a1135efeb57a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBuilder.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/AsyncTokenBucketBuilder.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.pulsar.broker.qos; +package org.apache.pulsar.common.util.qos; // CHECKSTYLE.OFF: ClassTypeParameterName public abstract class AsyncTokenBucketBuilder> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClock.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/DefaultMonotonicSnapshotClock.java similarity index 98% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClock.java rename to pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/DefaultMonotonicSnapshotClock.java index df3843921ed55..41a037380ad14 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClock.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/DefaultMonotonicSnapshotClock.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.pulsar.broker.qos; +package org.apache.pulsar.common.util.qos; import java.util.concurrent.TimeUnit; import java.util.function.LongSupplier; @@ -51,7 +51,6 @@ public DefaultMonotonicSnapshotClock(long snapshotIntervalNanos, LongSupplier cl thread.start(); } - /** {@inheritDoc} */ @Override public long getTickNanos(boolean requestSnapshot) { if (requestSnapshot) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucket.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/DynamicRateAsyncTokenBucket.java similarity index 98% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucket.java rename to pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/DynamicRateAsyncTokenBucket.java index 8edc73d1f51e3..b04875889a4ca 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucket.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/DynamicRateAsyncTokenBucket.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.pulsar.broker.qos; +package org.apache.pulsar.common.util.qos; import java.util.function.LongSupplier; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucketBuilder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/DynamicRateAsyncTokenBucketBuilder.java similarity index 98% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucketBuilder.java rename to pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/DynamicRateAsyncTokenBucketBuilder.java index 22270484c72f0..68c5455fcdede 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucketBuilder.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/DynamicRateAsyncTokenBucketBuilder.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.pulsar.broker.qos; +package org.apache.pulsar.common.util.qos; import java.util.function.LongSupplier; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucket.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/FinalRateAsyncTokenBucket.java similarity index 98% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucket.java rename to pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/FinalRateAsyncTokenBucket.java index 627c5ee1334b2..f222a50abeaa7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucket.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/FinalRateAsyncTokenBucket.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.pulsar.broker.qos; +package org.apache.pulsar.common.util.qos; /** * A subclass of {@link AsyncTokenBucket} that represents a token bucket with a rate which is final. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucketBuilder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/FinalRateAsyncTokenBucketBuilder.java similarity index 98% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucketBuilder.java rename to pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/FinalRateAsyncTokenBucketBuilder.java index ff4ed53c6c7fa..b8c91ad708786 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucketBuilder.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/FinalRateAsyncTokenBucketBuilder.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.pulsar.broker.qos; +package org.apache.pulsar.common.util.qos; /** * A builder class for creating instances of {@link FinalRateAsyncTokenBucket}. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/MonotonicSnapshotClock.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/MonotonicSnapshotClock.java similarity index 98% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/MonotonicSnapshotClock.java rename to pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/MonotonicSnapshotClock.java index 8f61bd5125b5f..c326b7b9a1290 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/MonotonicSnapshotClock.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/MonotonicSnapshotClock.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.pulsar.broker.qos; +package org.apache.pulsar.common.util.qos; /** * An interface representing a clock that provides a monotonic counter in nanoseconds. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/package-info.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/package-info.java similarity index 95% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/package-info.java rename to pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/package-info.java index 1078d86894efe..9c4ab0353dcb5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/package-info.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/qos/package-info.java @@ -19,4 +19,4 @@ /** * Pulsar broker Quality of Service (QoS) related classes. */ -package org.apache.pulsar.broker.qos; \ No newline at end of file +package org.apache.pulsar.common.util.qos; \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/AsyncTokenBucketTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/qos/AsyncTokenBucketTest.java similarity index 98% rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/AsyncTokenBucketTest.java rename to pulsar-common/src/test/java/org/apache/pulsar/common/util/qos/AsyncTokenBucketTest.java index b446f9e902f2a..d3c53aad50255 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/AsyncTokenBucketTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/qos/AsyncTokenBucketTest.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.pulsar.broker.qos; +package org.apache.pulsar.common.util.qos; import static org.testng.Assert.assertEquals; import java.util.concurrent.TimeUnit; From 5bdae6109bc7900d3cc3f5e93ab6b0520a252e68 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Fri, 29 Mar 2024 22:56:38 +0800 Subject: [PATCH 4/7] revert TimeWindow changes. --- .../pulsar/broker/stats/TimeWindow.java | 72 ++++--------------- .../pulsar/broker/stats/TimeWindowTest.java | 3 - 2 files changed, 15 insertions(+), 60 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/TimeWindow.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/TimeWindow.java index a7099c7fb5637..e2894a2bcdb07 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/TimeWindow.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/TimeWindow.java @@ -19,8 +19,6 @@ package org.apache.pulsar.broker.stats; import java.util.concurrent.atomic.AtomicReferenceArray; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; public final class TimeWindow { @@ -28,91 +26,51 @@ public final class TimeWindow { private final int sampleCount; private final AtomicReferenceArray> array; - private final Lock updateLock = new ReentrantLock(); - public TimeWindow(int sampleCount, int interval) { this.sampleCount = sampleCount; this.interval = interval; this.array = new AtomicReferenceArray<>(sampleCount); } - - public WindowWrap current(Function function) { - return current(function, System.currentTimeMillis()); - } - - /** * return current time window data. * * @param function generate data. * @return */ - public WindowWrap current(Function function, long timeMillis) { - if (timeMillis < 0) { + public synchronized WindowWrap current(Function function) { + long millis = System.currentTimeMillis(); + + if (millis < 0) { return null; } - - int idx = calculateTimeIdx(timeMillis); - // Calculate current bucket start time. - long windowStart = calculateWindowStart(timeMillis); + int idx = calculateTimeIdx(millis); + long windowStart = calculateWindowStart(millis); while (true) { WindowWrap old = array.get(idx); if (old == null) { - WindowWrap window = new WindowWrap<>(interval, windowStart, function.apply(null)); + WindowWrap window = new WindowWrap<>(interval, windowStart, null); if (array.compareAndSet(idx, null, window)) { + T value = null == function ? null : function.apply(null); + window.value(value); return window; } else { - // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } } else if (windowStart == old.start()) { return old; } else if (windowStart > old.start()) { - if (updateLock.tryLock()) { - try { - // Successfully get the update lock, now we reset the bucket. - T value = null == function ? null : function.apply(old.value()); - old.value(value); - old.resetWindowStart(windowStart); - return old; - } finally { - updateLock.unlock(); - } - } else { - // Contention failed, the thread will yield its time slice to wait for bucket available. - Thread.yield(); - } + T value = null == function ? null : function.apply(old.value()); + old.value(value); + old.resetWindowStart(windowStart); + return old; } else { - //when windowStart < old.value() - // Should not go through here, as the provided time is already behind. + //it should never goes here throw new IllegalStateException(); } } } - /** - * return next time window data. - * - * @param function generate data. - */ - public WindowWrap next(Function function, long timeMillis) { - if (this.sampleCount <= 1) { - throw new IllegalStateException("Argument sampleCount cannot less than 2"); - } - return this.current(function, timeMillis + interval); - } - - - /** - * return next time window data. - * - * @param function generate data. - */ - public WindowWrap next(Function function) { - return this.next(function, System.currentTimeMillis()); - } - private int calculateTimeIdx(long timeMillis) { long timeId = timeMillis / this.interval; return (int) (timeId % sampleCount); @@ -133,4 +91,4 @@ public int interval() { public long currentWindowStart(long millis) { return this.calculateWindowStart(millis); } -} +} \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TimeWindowTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TimeWindowTest.java index 4802d5cdb5a9f..89528c1965397 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TimeWindowTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TimeWindowTest.java @@ -20,9 +20,6 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; - -import org.apache.pulsar.broker.stats.TimeWindow; -import org.apache.pulsar.broker.stats.WindowWrap; import org.testng.annotations.Test; public class TimeWindowTest { From b23a096c2b1c328140a3c85cab47fbb625aa7e3c Mon Sep 17 00:00:00 2001 From: dao-jun Date: Fri, 29 Mar 2024 23:00:44 +0800 Subject: [PATCH 5/7] Fix imports --- .../org/apache/bookkeeper/mledger/OffloadReadHandle.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java index 3b9b4af13f455..9a2e1a06da393 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java @@ -19,6 +19,10 @@ package org.apache.bookkeeper.mledger; import com.google.common.annotations.VisibleForTesting; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerMetadata; @@ -26,11 +30,6 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.pulsar.common.util.qos.AsyncTokenBucket; -import java.util.Objects; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - /** * OffloadReadHandle is a wrapper of ReadHandle to offload read operations. */ From ada5b82bca7abe5ebc64a3323c4c67d39b578c88 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Fri, 29 Mar 2024 23:08:57 +0800 Subject: [PATCH 6/7] Fix checkstyle --- microbench/pom.xml | 2 +- .../util}/qos/AsyncTokenBucketBenchmark.java | 5 +---- .../pulsar/{broker => common/util}/qos/package-info.java | 2 +- 3 files changed, 3 insertions(+), 6 deletions(-) rename microbench/src/main/java/org/apache/pulsar/{broker => common/util}/qos/AsyncTokenBucketBenchmark.java (94%) rename microbench/src/main/java/org/apache/pulsar/{broker => common/util}/qos/package-info.java (95%) diff --git a/microbench/pom.xml b/microbench/pom.xml index a568e716ba0fa..b7a2403694e41 100644 --- a/microbench/pom.xml +++ b/microbench/pom.xml @@ -94,7 +94,7 @@ ${project.groupId} - pulsar-broker + pulsar-common ${project.version} diff --git a/microbench/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBenchmark.java b/microbench/src/main/java/org/apache/pulsar/common/util/qos/AsyncTokenBucketBenchmark.java similarity index 94% rename from microbench/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBenchmark.java rename to microbench/src/main/java/org/apache/pulsar/common/util/qos/AsyncTokenBucketBenchmark.java index d17c74819c448..a2bd07763d9b4 100644 --- a/microbench/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBenchmark.java +++ b/microbench/src/main/java/org/apache/pulsar/common/util/qos/AsyncTokenBucketBenchmark.java @@ -17,12 +17,9 @@ * under the License. */ -package org.apache.pulsar.broker.qos; +package org.apache.pulsar.common.util.qos; import java.util.concurrent.TimeUnit; - -import org.apache.pulsar.common.util.qos.AsyncTokenBucket; -import org.apache.pulsar.common.util.qos.DefaultMonotonicSnapshotClock; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; diff --git a/microbench/src/main/java/org/apache/pulsar/broker/qos/package-info.java b/microbench/src/main/java/org/apache/pulsar/common/util/qos/package-info.java similarity index 95% rename from microbench/src/main/java/org/apache/pulsar/broker/qos/package-info.java rename to microbench/src/main/java/org/apache/pulsar/common/util/qos/package-info.java index ccea21a210f86..9e8e31216d2b7 100644 --- a/microbench/src/main/java/org/apache/pulsar/broker/qos/package-info.java +++ b/microbench/src/main/java/org/apache/pulsar/common/util/qos/package-info.java @@ -19,4 +19,4 @@ /** * Benchmarks for Pulsar broker Quality of Service (QoS) related classes. */ -package org.apache.pulsar.broker.qos; \ No newline at end of file +package org.apache.pulsar.common.util.qos; \ No newline at end of file From dedaac830d388c56c59375f816e5c07189cd8278 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Wed, 3 Apr 2024 14:41:38 +0800 Subject: [PATCH 7/7] improve tests --- .../mledger/OffloadReadHandleTest.java | 112 ++++++++++++------ 1 file changed, 77 insertions(+), 35 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/OffloadReadHandleTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/OffloadReadHandleTest.java index 31323baab729e..174252c647679 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/OffloadReadHandleTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/OffloadReadHandleTest.java @@ -23,7 +23,9 @@ import io.netty.buffer.ByteBufAllocator; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.ReadHandle; @@ -43,6 +45,7 @@ public Object[][] permits() { return new Object[][]{ {-1L}, {0L}, + {50L}, {100L}, {10000L} }; @@ -50,51 +53,90 @@ public Object[][] permits() { @Test(dataProvider = "flowPermits") public void testFlowPermits(long flowPermits) throws Exception { - ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(100); - for (int a = 0; a < 100; a++) { - buf.writeByte(0); - } - LedgerEntry entry = LedgerEntryImpl.create(1, 1, buf.readableBytes(), buf); - List entryList = Lists.newArrayList(entry); - OffloadReadHandle h = null; - try (LedgerEntries entries = LedgerEntriesImpl.create(entryList)) { - ReadHandle handle = Mockito.mock(ReadHandle.class); - Mockito.doAnswer(inv -> CompletableFuture.completedFuture(entries)).when(handle) - .readAsync(Mockito.anyLong(), Mockito.anyLong()); - + OffloadReadHandle handle = (OffloadReadHandle) initializeReadHandle(flowPermits); + try { long start = System.currentTimeMillis(); - ManagedLedgerConfig config = new ManagedLedgerConfig(); - config.setManagedLedgerOffloadFlowPermitsPerSecond(flowPermits); - - CompletableFuture future = OffloadReadHandle.create(handle, config, - MLDataFormats.ManagedLedgerInfo.LedgerInfo.newBuilder().setLedgerId(1) - .setEntries(1).setSize(100).build()); - h = (OffloadReadHandle) future.get(); - - h.read(1, 1); - h.read(1, 1); - h.read(1, 1); - h.read(1, 1); - h.read(1, 1); - h.read(1, 1); - h.read(1, 1); - h.read(1, 1); - h.read(1, 1); - h.read(1, 1); + handle.read(1, 1); + handle.read(1, 1); + handle.read(1, 1); + handle.read(1, 1); + handle.read(1, 1); long actualDuration = System.currentTimeMillis() - start; if (flowPermits <= 0L) { - Assert.assertEquals(actualDuration, 1000D, 1000D); - } else if (flowPermits == 100L || flowPermits == 50L) { - long expectDuration = TimeUnit.SECONDS.toMillis(8); + Assert.assertEquals(actualDuration, 4000D, 4000D); + } else if (flowPermits == 50L) { + long expectDuration = 8000; + Assert.assertEquals(actualDuration, expectDuration, expectDuration * 0.2D); + } else if (flowPermits == 100L) { + long expectDuration = 4000; Assert.assertEquals(actualDuration, expectDuration, expectDuration * 0.2D); } else if (flowPermits == 10000L) { Assert.assertEquals(actualDuration, 1000D, 1000D); } } finally { - if (null != h) { - h.reset(); + handle.close(); + handle.reset(); + } + } + + + @Test + public void testOffloadFlowPermitsMultiThreads() throws Exception { + OffloadReadHandle handle = (OffloadReadHandle) initializeReadHandle(1000); + AtomicBoolean failed = new AtomicBoolean(false); + CountDownLatch latch = new CountDownLatch(10); + try { + long start = System.currentTimeMillis(); + for (int i = 0; i < 10; i++) { + new Thread(() -> { + for (int j = 0; j < 10; j++) { + try { + handle.read(1, 1); + } catch (Exception e) { + failed.set(true); + } + } + latch.countDown(); + }).start(); } + + latch.await(); + + Assert.assertFalse(failed.get()); + long actualDuration = System.currentTimeMillis() - start; + long expectDuration = TimeUnit.SECONDS.toMillis(10); + Assert.assertEquals(actualDuration, expectDuration, expectDuration * 0.2D); + } finally { + handle.close(); + handle.reset(); + } + } + + + private ReadHandle initializeReadHandle(long flowPermits) throws Exception { + ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(100); + for (int a = 0; a < 100; a++) { + buf.writeByte(0); } + LedgerEntry entry = LedgerEntryImpl.create(1, 1, buf.readableBytes(), buf); + List entryList = Lists.newArrayList(entry); + LedgerEntries entries = LedgerEntriesImpl.create(entryList); + ReadHandle handle = Mockito.mock(ReadHandle.class); + Mockito.doAnswer(inv -> CompletableFuture.completedFuture(entries)).when(handle) + .readAsync(Mockito.anyLong(), Mockito.anyLong()); + Mockito.doAnswer(inv -> { + entries.close(); + return CompletableFuture.completedFuture(null); + }).when(handle).closeAsync(); + + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setManagedLedgerOffloadFlowPermitsPerSecond(flowPermits); + + CompletableFuture future = OffloadReadHandle.create(handle, config, + MLDataFormats.ManagedLedgerInfo.LedgerInfo.newBuilder().setLedgerId(1) + .setEntries(1).setSize(100).build()); + + return future.get(); } } \ No newline at end of file