From b22d8d9cdb51a39dbffee1fbcc7dd9b47a14efda Mon Sep 17 00:00:00 2001 From: daojun Date: Tue, 6 Sep 2022 14:00:49 +0800 Subject: [PATCH 01/15] Add offload threshold --- .../mledger/FlowControllableReadHandle.java | 157 ++++++++++++++++++ .../mledger/ManagedLedgerConfig.java | 19 +++ .../mledger/impl/ManagedLedgerImpl.java | 48 +++--- .../bookkeeper/mledger/util}/TimeWindow.java | 72 ++++++-- .../bookkeeper/mledger/util}/WindowWrap.java | 2 +- .../bookkeeper/test}/TimeWindowTest.java | 7 +- .../pulsar/broker/ServiceConfiguration.java | 5 + .../pulsar/broker/service/BrokerService.java | 2 + .../PrometheusMetricsGenerator.java | 4 +- 9 files changed, 272 insertions(+), 44 deletions(-) create mode 100644 managed-ledger/src/main/java/org/apache/bookkeeper/mledger/FlowControllableReadHandle.java rename {pulsar-broker/src/main/java/org/apache/pulsar/broker/stats => managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util}/TimeWindow.java (53%) rename {pulsar-broker/src/main/java/org/apache/pulsar/broker/stats => managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util}/WindowWrap.java (97%) rename {pulsar-broker/src/test/java/org/apache/pulsar/broker/stats => managed-ledger/src/test/java/org/apache/bookkeeper/test}/TimeWindowTest.java (95%) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/FlowControllableReadHandle.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/FlowControllableReadHandle.java new file mode 100644 index 0000000000000..a9dd533dd8d65 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/FlowControllableReadHandle.java @@ -0,0 +1,157 @@ +package org.apache.bookkeeper.mledger; + +import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerMetadata; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.mledger.util.TimeWindow; +import org.apache.bookkeeper.mledger.util.WindowWrap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.LockSupport; + +public class FlowControllableReadHandle implements ReadHandle { + private static final AtomicBoolean INITIALIZED = new AtomicBoolean(false); + private static volatile long permittedBytes0; + private static volatile TimeWindow window0; + + private final ReadHandle delegator; + + private FlowControllableReadHandle(ReadHandle handle, long permittedBytes) { + this.delegator = handle; + if (INITIALIZED.compareAndSet(false, true)) { + permittedBytes0 = permittedBytes; + window0 = new TimeWindow<>(2, Long.valueOf(TimeUnit.SECONDS.toMillis(1)).intValue()); + } + } + + public static CompletableFuture create(ReadHandle handle, long permitBytesPerMin) { + return CompletableFuture.completedFuture(new FlowControllableReadHandle(handle, permitBytesPerMin)); + } + + @Override + public CompletableFuture readAsync(long firstEntry, long lastEntry) { + if (!checkFlow()) { + return this.readAsync(firstEntry, lastEntry); + } + + return this.delegator + .readAsync(firstEntry, lastEntry) + .whenComplete((v, t) -> { + if (t == null) { + AtomicLong total = new AtomicLong(0); + v.forEach(entry -> total.addAndGet(entry.getLength())); + recordReadBytes(total.get()); + } + }); + } + + @Override + public CompletableFuture readUnconfirmedAsync(long firstEntry, long lastEntry) { + if (!checkFlow()) { + return this.readUnconfirmedAsync(firstEntry, lastEntry); + } + + return this.delegator + .readUnconfirmedAsync(firstEntry, lastEntry) + .whenComplete((v, t) -> { + if (t == null) { + AtomicLong total = new AtomicLong(0); + v.forEach(entry -> total.addAndGet(entry.getLength())); + recordReadBytes(total.get()); + } + }); + } + + @Override + public CompletableFuture readLastAddConfirmedAsync() { + return this.delegator.readLastAddConfirmedAsync(); + } + + @Override + public CompletableFuture tryReadLastAddConfirmedAsync() { + return this.delegator.tryReadLastAddConfirmedAsync(); + } + + @Override + public long getLastAddConfirmed() { + return this.delegator.getLastAddConfirmed(); + } + + @Override + public long getLength() { + return this.delegator.getLength(); + } + + @Override + public boolean isClosed() { + return this.delegator.isClosed(); + } + + @Override + public CompletableFuture readLastAddConfirmedAndEntryAsync(long entryId, long timeOutInMillis, boolean parallel) { + if (!checkFlow()) { + return this.readLastAddConfirmedAndEntryAsync(entryId, timeOutInMillis, parallel); + } + + return this.delegator + .readLastAddConfirmedAndEntryAsync(entryId, timeOutInMillis, parallel) + .whenComplete((v, t) -> { + if (t == null) { + recordReadBytes(v.getEntry().getLength()); + } + }); + } + + @Override + public long getId() { + return this.delegator.getId(); + } + + @Override + public CompletableFuture closeAsync() { + return this.delegator.closeAsync(); + } + + @Override + public LedgerMetadata getLedgerMetadata() { + return this.delegator.getLedgerMetadata(); + } + + + private static boolean checkFlow() { + if (permittedBytes0 <= 0) { + return true; + } + + WindowWrap wrap = window0.current(__ -> new AtomicLong(0)); + if (wrap == null) { + // it should never goes here + return true; + } + + if (wrap.value().get() >= permittedBytes0) { + // park until next window start + LockSupport.parkUntil(TimeUnit.MILLISECONDS.toNanos(wrap.start() + wrap.interval())); + return false; + } + + return true; + } + + private static void recordReadBytes(long bytes) { + if (permittedBytes0 <= 0) { + return; + } + + WindowWrap wrap = window0.current(__ -> new AtomicLong(0)); + if (wrap == null) { + // it should never goes here + return; + } + + wrap.value().addAndGet(bytes); + } +} diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index 92c9c91198134..7487f32f72c68 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -84,6 +84,7 @@ public class ManagedLedgerConfig { private int minimumBacklogCursorsForCaching = 0; private int minimumBacklogEntriesForCaching = 1000; private int maxBacklogBetweenCursorsForCaching = 1000; + private long globalOffloadingPermitBytesPerSecond = 0; public boolean isCreateIfMissing() { return createIfMissing; @@ -742,4 +743,22 @@ public int getMaxBacklogBetweenCursorsForCaching() { public void setMaxBacklogBetweenCursorsForCaching(int maxBacklogBetweenCursorsForCaching) { this.maxBacklogBetweenCursorsForCaching = maxBacklogBetweenCursorsForCaching; } + + /** + * Set permitted size to offload on the broker. + * + * @param globalOffloadingPermitBytesPerSecond + */ + public void setGlobalOffloadingPermitBytesPerSecond(long globalOffloadingPermitBytesPerSecond) { + this.globalOffloadingPermitBytesPerSecond = globalOffloadingPermitBytesPerSecond; + } + + /** + * Get permitted size to offload on the broker. + * + * @return + */ + public long getGlobalOffloadingPermitBytesPerSecond() { + return globalOffloadingPermitBytesPerSecond; + } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 84a7377841c54..9fd69dd166645 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -94,6 +94,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.UpdatePropertiesCallback; import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.FlowControllableReadHandle; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; @@ -2972,7 +2973,7 @@ public void asyncOffloadPrefix(Position pos, OffloadCallback callback, Object ct } private void offloadLoop(CompletableFuture promise, Queue ledgersToOffload, - PositionImpl firstUnoffloaded, Optional firstError) { + PositionImpl firstUnoffloaded, Optional firstError) { if (getState() == State.Closed) { promise.completeExceptionally(new ManagedLedgerAlreadyClosedException( String.format("managed ledger [%s] has already closed", name))); @@ -2991,29 +2992,30 @@ private void offloadLoop(CompletableFuture promise, Queue extraMetadata = Map.of("ManagedLedgerName", name); String driverName = config.getLedgerOffloader().getOffloadDriverName(); + long permittedBytesToOffload = config.getGlobalOffloadingPermitBytesPerSecond(); Map driverMetadata = config.getLedgerOffloader().getOffloadDriverMetadata(); prepareLedgerInfoForOffloaded(ledgerId, uuid, driverName, driverMetadata) - .thenCompose((ignore) -> getLedgerHandle(ledgerId)) - .thenCompose(readHandle -> config.getLedgerOffloader().offload(readHandle, uuid, extraMetadata)) - .thenCompose((ignore) -> { - return Retries.run(Backoff.exponentialJittered(TimeUnit.SECONDS.toMillis(1), - TimeUnit.SECONDS.toHours(1)).limit(10), - FAIL_ON_CONFLICT, - () -> completeLedgerInfoForOffloaded(ledgerId, uuid), - scheduledExecutor, name) - .whenComplete((ignore2, exception) -> { - if (exception != null) { - log.error("[{}] Failed to offload data for the ledgerId {}", - name, ledgerId, exception); - cleanupOffloaded( - ledgerId, uuid, - driverName, driverMetadata, - "Metastore failure"); - } - }); - }) - .whenComplete((ignore, exception) -> { + .thenCompose((ignore) -> getLedgerHandle(ledgerId)) + .thenCompose(handle -> FlowControllableReadHandle.create(handle, permittedBytesToOffload)) + .thenCompose(readHandle -> config.getLedgerOffloader().offload(readHandle, uuid, extraMetadata)) + .thenCompose((ignore) -> + Retries.run( + Backoff.exponentialJittered(TimeUnit.SECONDS.toMillis(1), + TimeUnit.SECONDS.toHours(1)).limit(10), FAIL_ON_CONFLICT, + () -> completeLedgerInfoForOffloaded(ledgerId, uuid), + scheduledExecutor, name) + .whenComplete((ignore2, exception) -> { + if (exception != null) { + log.error("[{}] Failed to offload data for the ledgerId {}", + name, ledgerId, exception); + cleanupOffloaded( + ledgerId, uuid, + driverName, driverMetadata, + "Metastore failure"); + } + })) + .whenComplete((ignore, exception) -> { if (exception != null) { lastOffloadFailureTimestamp = System.currentTimeMillis(); log.warn("[{}] Exception occurred for ledgerId {} timestamp {} during offload", name, @@ -3030,9 +3032,7 @@ private void offloadLoop(CompletableFuture promise, Queue { @@ -26,51 +28,91 @@ public final class TimeWindow { private final int sampleCount; private final AtomicReferenceArray> array; + private final Lock updateLock = new ReentrantLock(); + public TimeWindow(int sampleCount, int interval) { this.sampleCount = sampleCount; this.interval = interval; this.array = new AtomicReferenceArray<>(sampleCount); } + + public WindowWrap current(Function function) { + return current(function, System.currentTimeMillis()); + } + + /** * return current time window data. * * @param function generate data. * @return */ - public synchronized WindowWrap current(Function function) { - long millis = System.currentTimeMillis(); - - if (millis < 0) { + public WindowWrap current(Function function, long timeMillis) { + if (timeMillis < 0) { return null; } - int idx = calculateTimeIdx(millis); - long windowStart = calculateWindowStart(millis); + + int idx = calculateTimeIdx(timeMillis); + // Calculate current bucket start time. + long windowStart = calculateWindowStart(timeMillis); while (true) { WindowWrap old = array.get(idx); if (old == null) { - WindowWrap window = new WindowWrap<>(interval, windowStart, null); + WindowWrap window = new WindowWrap<>(interval, windowStart, function.apply(null)); if (array.compareAndSet(idx, null, window)) { - T value = null == function ? null : function.apply(null); - window.value(value); return window; } else { + // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } } else if (windowStart == old.start()) { return old; } else if (windowStart > old.start()) { - T value = null == function ? null : function.apply(old.value()); - old.value(value); - old.resetWindowStart(windowStart); - return old; + if (updateLock.tryLock()) { + try { + // Successfully get the update lock, now we reset the bucket. + T value = null == function ? null : function.apply(old.value()); + old.value(value); + old.resetWindowStart(windowStart); + return old; + } finally { + updateLock.unlock(); + } + } else { + // Contention failed, the thread will yield its time slice to wait for bucket available. + Thread.yield(); + } } else { - //it should never goes here + //when windowStart < old.value() + // Should not go through here, as the provided time is already behind. throw new IllegalStateException(); } } } + /** + * return next time window data. + * + * @param function generate data. + */ + public WindowWrap next(Function function, long timeMillis) { + if (this.sampleCount <= 1) { + throw new IllegalStateException("Argument sampleCount cannot less than 2"); + } + return this.current(function, timeMillis + interval); + } + + + /** + * return next time window data. + * + * @param function generate data. + */ + public WindowWrap next(Function function) { + return this.next(function, System.currentTimeMillis()); + } + private int calculateTimeIdx(long timeMillis) { long timeId = timeMillis / this.interval; return (int) (timeId % sampleCount); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/WindowWrap.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/WindowWrap.java similarity index 97% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/WindowWrap.java rename to managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/WindowWrap.java index 45bcb093e7405..d489e76847b6a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/WindowWrap.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/WindowWrap.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.stats; +package org.apache.bookkeeper.mledger.util; public final class WindowWrap { private final long interval; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TimeWindowTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/TimeWindowTest.java similarity index 95% rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TimeWindowTest.java rename to managed-ledger/src/test/java/org/apache/bookkeeper/test/TimeWindowTest.java index 26812e495f0d3..b74d1491a3c10 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TimeWindowTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/TimeWindowTest.java @@ -16,11 +16,14 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.stats; +package org.apache.bookkeeper.test; + +import org.apache.bookkeeper.mledger.util.TimeWindow; +import org.apache.bookkeeper.mledger.util.WindowWrap; +import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; -import org.testng.annotations.Test; public class TimeWindowTest { diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 30ca29faad889..c75a76942336c 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1919,6 +1919,11 @@ public class ServiceConfiguration implements PulsarConfiguration { doc = "The number of bytes before triggering automatic offload to long term storage" ) private long managedLedgerOffloadAutoTriggerSizeThresholdBytes = -1L; + @FieldContext( + category = CATEGORY_STORAGE_OFFLOADING, + doc = "The number of bytes permitted to offload on this broker" + ) + private long managedLedgerGlobalOffloadingPermitBytesPerSecond = 0L; @FieldContext( category = CATEGORY_STORAGE_ML, doc = "Max number of entries to append to a cursor ledger" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 7c0afdb712a28..0cdd9a539f5b1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1676,6 +1676,8 @@ public CompletableFuture getManagedLedgerConfig(TopicName t serviceConfig.getManagedLedgerMinimumBacklogEntriesForCaching()); managedLedgerConfig.setMaxBacklogBetweenCursorsForCaching( serviceConfig.getManagedLedgerMaxBacklogBetweenCursorsForCaching()); + managedLedgerConfig.setGlobalOffloadingPermitBytesPerSecond( + serviceConfig.getManagedLedgerGlobalOffloadingPermitBytesPerSecond()); OffloadPoliciesImpl nsLevelOffloadPolicies = (OffloadPoliciesImpl) policies.map(p -> p.offload_policies).orElse(null); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java index 25f227955be72..7c2db3233963f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java @@ -42,12 +42,12 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.util.TimeWindow; +import org.apache.bookkeeper.mledger.util.WindowWrap; import org.apache.bookkeeper.stats.NullStatsProvider; import org.apache.bookkeeper.stats.StatsProvider; import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.broker.PulsarService; -import org.apache.pulsar.broker.stats.TimeWindow; -import org.apache.pulsar.broker.stats.WindowWrap; import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics; import org.apache.pulsar.broker.stats.metrics.ManagedLedgerCacheMetrics; import org.apache.pulsar.broker.stats.metrics.ManagedLedgerMetrics; From 0f49901bdf801e5d846ec9d8b9571089dbd14bf0 Mon Sep 17 00:00:00 2001 From: daojun Date: Tue, 6 Sep 2022 14:01:19 +0800 Subject: [PATCH 02/15] ADD license header --- .../mledger/FlowControllableReadHandle.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/FlowControllableReadHandle.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/FlowControllableReadHandle.java index a9dd533dd8d65..cddb7dcd2169a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/FlowControllableReadHandle.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/FlowControllableReadHandle.java @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.bookkeeper.mledger; import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; From 54ef21037d0a05e92adbb98173577d26eea28415 Mon Sep 17 00:00:00 2001 From: daojun Date: Thu, 8 Sep 2022 01:37:22 +0800 Subject: [PATCH 03/15] fix code formats --- .../mledger/impl/ManagedLedgerImpl.java | 34 +++++++++---------- 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 9fd69dd166645..42e39ed447d65 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2992,29 +2992,27 @@ private void offloadLoop(CompletableFuture promise, Queue extraMetadata = Map.of("ManagedLedgerName", name); String driverName = config.getLedgerOffloader().getOffloadDriverName(); - long permittedBytesToOffload = config.getGlobalOffloadingPermitBytesPerSecond(); + final long flowPermits = config.getGlobalOffloadingPermitBytesPerSecond(); Map driverMetadata = config.getLedgerOffloader().getOffloadDriverMetadata(); prepareLedgerInfoForOffloaded(ledgerId, uuid, driverName, driverMetadata) .thenCompose((ignore) -> getLedgerHandle(ledgerId)) - .thenCompose(handle -> FlowControllableReadHandle.create(handle, permittedBytesToOffload)) + .thenCompose(handle -> FlowControllableReadHandle.create(handle, flowPermits)) .thenCompose(readHandle -> config.getLedgerOffloader().offload(readHandle, uuid, extraMetadata)) - .thenCompose((ignore) -> - Retries.run( - Backoff.exponentialJittered(TimeUnit.SECONDS.toMillis(1), - TimeUnit.SECONDS.toHours(1)).limit(10), FAIL_ON_CONFLICT, - () -> completeLedgerInfoForOffloaded(ledgerId, uuid), - scheduledExecutor, name) - .whenComplete((ignore2, exception) -> { - if (exception != null) { - log.error("[{}] Failed to offload data for the ledgerId {}", - name, ledgerId, exception); - cleanupOffloaded( - ledgerId, uuid, - driverName, driverMetadata, - "Metastore failure"); - } - })) + .thenCompose((ignore) -> { + return Retries.run(Backoff.exponentialJittered(TimeUnit.SECONDS.toMillis(1), + TimeUnit.SECONDS.toHours(1)).limit(10), + FAIL_ON_CONFLICT, + () -> completeLedgerInfoForOffloaded(ledgerId, uuid), + scheduledExecutor, name) + .whenComplete((ignore2, exception) -> { + if (exception != null) { + log.error("[{}] Failed to offload data for the ledgerId {}", + name, ledgerId, exception); + cleanupOffloaded(ledgerId, uuid, driverName, driverMetadata, "Metastore failure"); + } + }); + }) .whenComplete((ignore, exception) -> { if (exception != null) { lastOffloadFailureTimestamp = System.currentTimeMillis(); From 2b0aba067166a92f94609181bd7f67bc1f5f4d4b Mon Sep 17 00:00:00 2001 From: daojun Date: Tue, 13 Sep 2022 16:12:56 +0800 Subject: [PATCH 04/15] fix checkstyle and add Test --- .../mledger/FlowControllableReadHandle.java | 21 +++--- .../mledger/impl/ManagedLedgerImpl.java | 3 +- .../FlowControllableReadHandleTest.java | 66 +++++++++++++++++++ .../broker/stats/PrometheusMetricsTest.java | 1 + 4 files changed, 81 insertions(+), 10 deletions(-) create mode 100644 managed-ledger/src/test/java/org/apache/bookkeeper/mledger/FlowControllableReadHandleTest.java diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/FlowControllableReadHandle.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/FlowControllableReadHandle.java index cddb7dcd2169a..bbdd892be7d10 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/FlowControllableReadHandle.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/FlowControllableReadHandle.java @@ -18,17 +18,17 @@ */ package org.apache.bookkeeper.mledger; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.LockSupport; import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.mledger.util.TimeWindow; import org.apache.bookkeeper.mledger.util.WindowWrap; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.LockSupport; public class FlowControllableReadHandle implements ReadHandle { private static final AtomicBoolean INITIALIZED = new AtomicBoolean(false); @@ -45,8 +45,8 @@ private FlowControllableReadHandle(ReadHandle handle, long permittedBytes) { } } - public static CompletableFuture create(ReadHandle handle, long permitBytesPerMin) { - return CompletableFuture.completedFuture(new FlowControllableReadHandle(handle, permitBytesPerMin)); + public static CompletableFuture create(ReadHandle handle, long permitBytes) { + return CompletableFuture.completedFuture(new FlowControllableReadHandle(handle, permitBytes)); } @Override @@ -109,7 +109,8 @@ public boolean isClosed() { } @Override - public CompletableFuture readLastAddConfirmedAndEntryAsync(long entryId, long timeOutInMillis, boolean parallel) { + public CompletableFuture readLastAddConfirmedAndEntryAsync( + long entryId, long timeOutInMillis, boolean parallel) { if (!checkFlow()) { return this.readLastAddConfirmedAndEntryAsync(entryId, timeOutInMillis, parallel); } @@ -152,7 +153,9 @@ private static boolean checkFlow() { if (wrap.value().get() >= permittedBytes0) { // park until next window start - LockSupport.parkUntil(TimeUnit.MILLISECONDS.toNanos(wrap.start() + wrap.interval())); + long end = wrap.start() + wrap.interval(); + long parkMillis = end - System.currentTimeMillis(); + LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(parkMillis)); return false; } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 42e39ed447d65..3f0185897242a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3009,7 +3009,8 @@ private void offloadLoop(CompletableFuture promise, Queue CompletableFuture.completedFuture(entries)).when(handle).readAsync(1, 1); + + long start = System.currentTimeMillis(); + CompletableFuture future = FlowControllableReadHandle.create(handle, 100); + ReadHandle h = future.get(); + h.readAsync(1, 1); + h.readAsync(1, 1); + h.readAsync(1, 1); + h.readAsync(1, 1); + h.readAsync(1, 1); + h.readAsync(1, 1); + h.readAsync(1, 1); + h.readAsync(1, 1); + h.readAsync(1, 1); + h.readAsync(1, 1); + + Assert.assertTrue(System.currentTimeMillis() - start > TimeUnit.SECONDS.toMillis(8)); + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index eb01e35523aa8..76ad7c4daf564 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -53,6 +53,7 @@ import javax.crypto.SecretKey; import javax.naming.AuthenticationException; import lombok.Cleanup; +import org.apache.bookkeeper.mledger.util.TimeWindow; import org.apache.commons.io.IOUtils; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; From 33b503ab926d26ef048b4e6d867d913dc15737b4 Mon Sep 17 00:00:00 2001 From: daojun Date: Tue, 20 Sep 2022 20:51:42 +0800 Subject: [PATCH 05/15] reimpl --- ...ReadHandle.java => OffloadReadHandle.java} | 116 +++++++++++------- .../mledger/impl/ManagedLedgerImpl.java | 4 +- ...leTest.java => OffloadReadHandleTest.java} | 4 +- 3 files changed, 78 insertions(+), 46 deletions(-) rename managed-ledger/src/main/java/org/apache/bookkeeper/mledger/{FlowControllableReadHandle.java => OffloadReadHandle.java} (57%) rename managed-ledger/src/test/java/org/apache/bookkeeper/mledger/{FlowControllableReadHandleTest.java => OffloadReadHandleTest.java} (94%) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/FlowControllableReadHandle.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java similarity index 57% rename from managed-ledger/src/main/java/org/apache/bookkeeper/mledger/FlowControllableReadHandle.java rename to managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java index bbdd892be7d10..748374bf261e8 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/FlowControllableReadHandle.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java @@ -18,69 +18,73 @@ */ package org.apache.bookkeeper.mledger; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.LockSupport; import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.mledger.util.TimeWindow; import org.apache.bookkeeper.mledger.util.WindowWrap; -public class FlowControllableReadHandle implements ReadHandle { +public class OffloadReadHandle implements ReadHandle { private static final AtomicBoolean INITIALIZED = new AtomicBoolean(false); private static volatile long permittedBytes0; private static volatile TimeWindow window0; + private static final int MAX_PROCESSING_CMDS = 1024; + private static final AtomicInteger PROCESSING_CMDS = new AtomicInteger(0); private final ReadHandle delegator; + private final OrderedScheduler scheduler; - private FlowControllableReadHandle(ReadHandle handle, long permittedBytes) { + private OffloadReadHandle(ReadHandle handle, long permittedBytes, OrderedScheduler scheduler) { this.delegator = handle; if (INITIALIZED.compareAndSet(false, true)) { permittedBytes0 = permittedBytes; window0 = new TimeWindow<>(2, Long.valueOf(TimeUnit.SECONDS.toMillis(1)).intValue()); } + + this.scheduler = Objects.requireNonNull(scheduler); } - public static CompletableFuture create(ReadHandle handle, long permitBytes) { - return CompletableFuture.completedFuture(new FlowControllableReadHandle(handle, permitBytes)); + public static CompletableFuture create(ReadHandle handle, long permitBytes, OrderedScheduler scheduler) { + return CompletableFuture.completedFuture(new OffloadReadHandle(handle, permitBytes, scheduler)); } @Override public CompletableFuture readAsync(long firstEntry, long lastEntry) { - if (!checkFlow()) { - return this.readAsync(firstEntry, lastEntry); + final long delayMills = calculateDelayMillis(); + if (delayMills > 0) { + if (PROCESSING_CMDS.get() > MAX_PROCESSING_CMDS) { + LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(delayMills)); + return readAsync(firstEntry, lastEntry); + } else { + CompletableFuture f = new CompletableFuture<>(); + Runnable cmd = new ReadAsyncCommand(firstEntry, lastEntry, f); + scheduler.schedule(cmd, delayMills, TimeUnit.MILLISECONDS); + PROCESSING_CMDS.incrementAndGet(); + return f; + } } return this.delegator .readAsync(firstEntry, lastEntry) .whenComplete((v, t) -> { if (t == null) { - AtomicLong total = new AtomicLong(0); - v.forEach(entry -> total.addAndGet(entry.getLength())); - recordReadBytes(total.get()); + recordReadBytes(v); } }); } @Override public CompletableFuture readUnconfirmedAsync(long firstEntry, long lastEntry) { - if (!checkFlow()) { - return this.readUnconfirmedAsync(firstEntry, lastEntry); - } - - return this.delegator - .readUnconfirmedAsync(firstEntry, lastEntry) - .whenComplete((v, t) -> { - if (t == null) { - AtomicLong total = new AtomicLong(0); - v.forEach(entry -> total.addAndGet(entry.getLength())); - recordReadBytes(total.get()); - } - }); + return this.delegator.readUnconfirmedAsync(firstEntry, lastEntry); } @Override @@ -111,17 +115,7 @@ public boolean isClosed() { @Override public CompletableFuture readLastAddConfirmedAndEntryAsync( long entryId, long timeOutInMillis, boolean parallel) { - if (!checkFlow()) { - return this.readLastAddConfirmedAndEntryAsync(entryId, timeOutInMillis, parallel); - } - - return this.delegator - .readLastAddConfirmedAndEntryAsync(entryId, timeOutInMillis, parallel) - .whenComplete((v, t) -> { - if (t == null) { - recordReadBytes(v.getEntry().getLength()); - } - }); + return this.delegator.readLastAddConfirmedAndEntryAsync(entryId, timeOutInMillis, parallel); } @Override @@ -140,39 +134,77 @@ public LedgerMetadata getLedgerMetadata() { } - private static boolean checkFlow() { + private static long calculateDelayMillis() { if (permittedBytes0 <= 0) { - return true; + return 0; } WindowWrap wrap = window0.current(__ -> new AtomicLong(0)); if (wrap == null) { // it should never goes here - return true; + return 0; } if (wrap.value().get() >= permittedBytes0) { // park until next window start long end = wrap.start() + wrap.interval(); - long parkMillis = end - System.currentTimeMillis(); - LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(parkMillis)); - return false; + return end - System.currentTimeMillis(); } - return true; + return 0; } - private static void recordReadBytes(long bytes) { + private static void recordReadBytes(LedgerEntries entries) { if (permittedBytes0 <= 0) { return; } + if (entries == null) { + return; + } + + AtomicLong num = new AtomicLong(0); + entries.forEach(en -> num.addAndGet(en.getLength())); + WindowWrap wrap = window0.current(__ -> new AtomicLong(0)); if (wrap == null) { // it should never goes here return; } - wrap.value().addAndGet(bytes); + wrap.value().addAndGet(num.get()); + } + + + private final class ReadAsyncCommand implements Runnable { + + private final long firstEntry; + private final long lastEntry; + private final CompletableFuture f; + + ReadAsyncCommand(long firstEntry, long lastEntry, CompletableFuture f) { + this.firstEntry = firstEntry; + this.lastEntry = lastEntry; + this.f = f; + } + + @Override + public void run() { + long delayMillis = calculateDelayMillis(); + if (delayMillis > 0) { + scheduler.schedule(this, delayMillis, TimeUnit.MILLISECONDS); + return; + } + + delegator.readAsync(firstEntry, lastEntry) + .whenComplete((entries, e) -> { + if (e != null) { + f.completeExceptionally(e); + } else { + f.complete(entries); + recordReadBytes(entries); + } + }); + } } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 1d75280b0fb7e..a31f7cda58d09 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -93,7 +93,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.UpdatePropertiesCallback; import org.apache.bookkeeper.mledger.Entry; -import org.apache.bookkeeper.mledger.FlowControllableReadHandle; +import org.apache.bookkeeper.mledger.OffloadReadHandle; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; @@ -2995,7 +2995,7 @@ private void offloadLoop(CompletableFuture promise, Queue getLedgerHandle(ledgerId)) - .thenCompose(handle -> FlowControllableReadHandle.create(handle, flowPermits)) + .thenCompose(handle -> OffloadReadHandle.create(handle, flowPermits)) .thenCompose(readHandle -> config.getLedgerOffloader().offload(readHandle, uuid, extraMetadata)) .thenCompose((ignore) -> { return Retries.run(Backoff.exponentialJittered(TimeUnit.SECONDS.toMillis(1), diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/FlowControllableReadHandleTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/OffloadReadHandleTest.java similarity index 94% rename from managed-ledger/src/test/java/org/apache/bookkeeper/mledger/FlowControllableReadHandleTest.java rename to managed-ledger/src/test/java/org/apache/bookkeeper/mledger/OffloadReadHandleTest.java index 551abc92177ce..46c684fea0910 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/FlowControllableReadHandleTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/OffloadReadHandleTest.java @@ -32,7 +32,7 @@ import org.testng.Assert; import org.testng.annotations.Test; -public class FlowControllableReadHandleTest { +public class OffloadReadHandleTest { @Test public void test() throws Exception { @@ -47,7 +47,7 @@ public void test() throws Exception { Mockito.doAnswer(inv -> CompletableFuture.completedFuture(entries)).when(handle).readAsync(1, 1); long start = System.currentTimeMillis(); - CompletableFuture future = FlowControllableReadHandle.create(handle, 100); + CompletableFuture future = OffloadReadHandle.create(handle, 100); ReadHandle h = future.get(); h.readAsync(1, 1); h.readAsync(1, 1); From a8ce8289f8183c0e11896fbab0af1fce474d9b98 Mon Sep 17 00:00:00 2001 From: daojun Date: Wed, 28 Sep 2022 23:45:07 +0800 Subject: [PATCH 06/15] Split FileSystem ledger offload task into small tasks. --- .../FileSystemManagedLedgerOffloader.java | 132 ++++++++++-------- 1 file changed, 71 insertions(+), 61 deletions(-) diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java index 8e87c230adb15..80ea9bd06d7ac 100644 --- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java +++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java @@ -23,14 +23,9 @@ import com.google.common.collect.ImmutableMap; import io.netty.util.Recycler; import java.io.IOException; -import java.util.Iterator; import java.util.Map; import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.*; import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.ReadHandle; @@ -151,8 +146,7 @@ public CompletableFuture offload(ReadHandle readHandle, UUID uuid, Map promise = new CompletableFuture<>(); scheduler.chooseThread(readHandle.getId()).execute( new LedgerReader(readHandle, uuid, extraMetadata, promise, storageBasePath, configuration, - assignmentScheduler, offloadPolicies.getManagedLedgerOffloadPrefetchRounds(), - this.offloaderStats)); + assignmentScheduler, this.offloaderStats)); return promise; } @@ -165,8 +159,7 @@ private static class LedgerReader implements Runnable { private final String storageBasePath; private final Configuration configuration; volatile Exception fileSystemWriteException = null; - private OrderedScheduler assignmentScheduler; - private int managedLedgerOffloadPrefetchRounds = 1; + private final OrderedScheduler assignmentScheduler; private final LedgerOffloaderStats offloaderStats; private LedgerReader(ReadHandle readHandle, @@ -176,7 +169,6 @@ private LedgerReader(ReadHandle readHandle, String storageBasePath, Configuration configuration, OrderedScheduler assignmentScheduler, - int managedLedgerOffloadPrefetchRounds, LedgerOffloaderStats offloaderStats) { this.readHandle = readHandle; this.uuid = uuid; @@ -185,7 +177,6 @@ private LedgerReader(ReadHandle readHandle, this.storageBasePath = storageBasePath; this.configuration = configuration; this.assignmentScheduler = assignmentScheduler; - this.managedLedgerOffloadPrefetchRounds = managedLedgerOffloadPrefetchRounds; this.offloaderStats = offloaderStats; } @@ -212,45 +203,81 @@ public void run() { byte[] ledgerMetadata = buildLedgerMetadataFormat(readHandle.getLedgerMetadata()); value.set(ledgerMetadata, 0, ledgerMetadata.length); dataWriter.append(key, value); - AtomicLong haveOffloadEntryNumber = new AtomicLong(0); - long needToOffloadFirstEntryNumber = 0; - CountDownLatch countDownLatch; - //avoid prefetch too much data into memory - Semaphore semaphore = new Semaphore(managedLedgerOffloadPrefetchRounds); - do { - long end = Math.min(needToOffloadFirstEntryNumber + ENTRIES_PER_READ - 1, - readHandle.getLastAddConfirmed()); - log.debug("read ledger entries. start: {}, end: {}", needToOffloadFirstEntryNumber, end); - long startReadTime = System.nanoTime(); - LedgerEntries ledgerEntriesOnce = readHandle.readAsync(needToOffloadFirstEntryNumber, end).get(); - long cost = System.nanoTime() - startReadTime; - this.offloaderStats.recordReadLedgerLatency(topicName, cost, TimeUnit.NANOSECONDS); - semaphore.acquire(); - countDownLatch = new CountDownLatch(1); - assignmentScheduler.chooseThread(ledgerId) - .execute(FileSystemWriter.create(ledgerEntriesOnce, - dataWriter, semaphore, countDownLatch, haveOffloadEntryNumber, this)); - needToOffloadFirstEntryNumber = end + 1; - } while (needToOffloadFirstEntryNumber - 1 != readHandle.getLastAddConfirmed() - && fileSystemWriteException == null); - countDownLatch.await(); - if (fileSystemWriteException != null) { - throw fileSystemWriteException; - } - IOUtils.closeStream(dataWriter); - promise.complete(null); + long start = 0; + long end = Math.min(start + ENTRIES_PER_READ - 1, readHandle.getLastAddConfirmed()); + + // Submit the first LedgerPartitionedReader task. + Executor executor = assignmentScheduler.chooseThread(ledgerId); + executor.execute(new LedgerPartitionedReader(this, dataWriter, executor, start, end)); } catch (Exception e) { log.error("Exception when get CompletableFuture : ManagerLedgerName: {}, " + "LedgerId: {}, UUID: {} ", topicName, ledgerId, uuid, e); - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } this.offloaderStats.recordOffloadError(topicName); promise.completeExceptionally(e); } } } + private static final class LedgerPartitionedReader implements Runnable { + + private final ReadHandle readHandle; + private final String topicName; + private final LedgerOffloaderStats stats; + private final MapFile.Writer dataWriter; + private final Executor executor; + private final LedgerReader ledgerReader; + private final CompletableFuture promise; + + private final long startEntry; + private final long endEntry; + + public LedgerPartitionedReader(LedgerReader ledgerReader, MapFile.Writer dataWriter, + Executor executor, long startEntry, long endEntry) { + this.ledgerReader = ledgerReader; + this.readHandle = ledgerReader.readHandle; + this.topicName = ledgerReader.extraMetadata.get(MANAGED_LEDGER_NAME); + this.stats = ledgerReader.offloaderStats; + this.dataWriter = dataWriter; + this.executor = executor; + this.promise = ledgerReader.promise; + + this.startEntry = startEntry; + this.endEntry = endEntry; + } + + @Override + public void run() { + log.debug("read ledger entries. start: {}, end: {}", startEntry, endEntry); + long startReadTime = System.nanoTime(); + + readHandle.readAsync(startEntry, endEntry) + .thenAcceptAsync(entries -> { + long cost = System.nanoTime() - startReadTime; + this.stats.recordReadLedgerLatency(topicName, cost, TimeUnit.NANOSECONDS); + // Execute the FileWrite task on the current thread. + FileSystemWriter.create(entries, dataWriter, ledgerReader).run(); + currentTaskFinished(); + }, this.executor) + .exceptionally(e -> { + log.error(""); + return null; + }); + } + + private void currentTaskFinished() { + if (ledgerReader.fileSystemWriteException != null) { + promise.completeExceptionally(ledgerReader.fileSystemWriteException); + } + + if (this.endEntry == readHandle.getLastAddConfirmed()) { + IOUtils.closeStream(this.dataWriter); + promise.complete(null); + } else { + // TODO submit next task + } + } + } + private static class FileSystemWriter implements Runnable { private LedgerEntries ledgerEntriesOnce; @@ -259,11 +286,8 @@ private static class FileSystemWriter implements Runnable { private final BytesWritable value = new BytesWritable(); private MapFile.Writer dataWriter; - private CountDownLatch countDownLatch; - private AtomicLong haveOffloadEntryNumber; private LedgerReader ledgerReader; - private Semaphore semaphore; - private Recycler.Handle recyclerHandle; + private final Recycler.Handle recyclerHandle; private FileSystemWriter(Recycler.Handle recyclerHandle) { this.recyclerHandle = recyclerHandle; @@ -278,28 +302,19 @@ protected FileSystemWriter newObject(Recycler.Handle handle) { private void recycle() { this.dataWriter = null; - this.countDownLatch = null; - this.haveOffloadEntryNumber = null; this.ledgerReader = null; this.ledgerEntriesOnce = null; - this.semaphore = null; recyclerHandle.recycle(this); } public static FileSystemWriter create(LedgerEntries ledgerEntriesOnce, MapFile.Writer dataWriter, - Semaphore semaphore, - CountDownLatch countDownLatch, - AtomicLong haveOffloadEntryNumber, LedgerReader ledgerReader) { FileSystemWriter writer = RECYCLER.get(); writer.ledgerReader = ledgerReader; writer.dataWriter = dataWriter; - writer.countDownLatch = countDownLatch; - writer.haveOffloadEntryNumber = haveOffloadEntryNumber; writer.ledgerEntriesOnce = ledgerEntriesOnce; - writer.semaphore = semaphore; return writer; } @@ -307,9 +322,7 @@ public static FileSystemWriter create(LedgerEntries ledgerEntriesOnce, public void run() { String managedLedgerName = ledgerReader.extraMetadata.get(MANAGED_LEDGER_NAME); if (ledgerReader.fileSystemWriteException == null) { - Iterator iterator = ledgerEntriesOnce.iterator(); - while (iterator.hasNext()) { - LedgerEntry entry = iterator.next(); + for (LedgerEntry entry : ledgerEntriesOnce) { long entryId = entry.getEntryId(); key.set(entryId); try { @@ -320,13 +333,10 @@ public void run() { ledgerReader.offloaderStats.recordWriteToStorageError(managedLedgerName); break; } - haveOffloadEntryNumber.incrementAndGet(); ledgerReader.offloaderStats.recordOffloadBytes(managedLedgerName, entry.getLength()); } } - countDownLatch.countDown(); ledgerEntriesOnce.close(); - semaphore.release(); this.recycle(); } } From f9a3130961b3e54a61c42ac44b2a1a61617cdccb Mon Sep 17 00:00:00 2001 From: daojun Date: Sun, 9 Oct 2022 16:32:57 +0800 Subject: [PATCH 07/15] Split FileSystem ledger offload task into small tasks. --- .../FileSystemManagedLedgerOffloader.java | 43 +++- .../impl/BlobStoreManagedLedgerOffloader.java | 213 +++++++++++------- 2 files changed, 164 insertions(+), 92 deletions(-) diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java index 80ea9bd06d7ac..10ed04e86ef4f 100644 --- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java +++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java @@ -207,8 +207,8 @@ public void run() { long end = Math.min(start + ENTRIES_PER_READ - 1, readHandle.getLastAddConfirmed()); // Submit the first LedgerPartitionedReader task. - Executor executor = assignmentScheduler.chooseThread(ledgerId); - executor.execute(new LedgerPartitionedReader(this, dataWriter, executor, start, end)); + assignmentScheduler.chooseThread(ledgerId) + .execute(new LedgerPartitionedReader(this, dataWriter, assignmentScheduler, start, end)); } catch (Exception e) { log.error("Exception when get CompletableFuture : ManagerLedgerName: {}, " + "LedgerId: {}, UUID: {} ", topicName, ledgerId, uuid, e); @@ -224,27 +224,39 @@ private static final class LedgerPartitionedReader implements Runnable { private final String topicName; private final LedgerOffloaderStats stats; private final MapFile.Writer dataWriter; - private final Executor executor; + private final OrderedScheduler scheduler; private final LedgerReader ledgerReader; private final CompletableFuture promise; private final long startEntry; private final long endEntry; - public LedgerPartitionedReader(LedgerReader ledgerReader, MapFile.Writer dataWriter, - Executor executor, long startEntry, long endEntry) { + LedgerPartitionedReader(LedgerReader ledgerReader, MapFile.Writer dataWriter, + OrderedScheduler scheduler, long startEntry, long endEntry) { this.ledgerReader = ledgerReader; this.readHandle = ledgerReader.readHandle; this.topicName = ledgerReader.extraMetadata.get(MANAGED_LEDGER_NAME); this.stats = ledgerReader.offloaderStats; this.dataWriter = dataWriter; - this.executor = executor; + this.scheduler = scheduler; this.promise = ledgerReader.promise; this.startEntry = startEntry; this.endEntry = endEntry; } + LedgerPartitionedReader(LedgerPartitionedReader reader, long startEntry, long endEntry) { + this.readHandle = reader.readHandle; + this.topicName = reader.topicName; + this.stats = reader.stats; + this.dataWriter = reader.dataWriter; + this.scheduler = reader.scheduler; + this.ledgerReader = reader.ledgerReader; + this.promise = reader.promise; + this.startEntry = startEntry; + this.endEntry = endEntry; + } + @Override public void run() { log.debug("read ledger entries. start: {}, end: {}", startEntry, endEntry); @@ -256,24 +268,33 @@ public void run() { this.stats.recordReadLedgerLatency(topicName, cost, TimeUnit.NANOSECONDS); // Execute the FileWrite task on the current thread. FileSystemWriter.create(entries, dataWriter, ledgerReader).run(); - currentTaskFinished(); - }, this.executor) + // Do the post process. + this.processAfterTaskFinished(); + }, this.scheduler.chooseThread(readHandle.getId())) .exceptionally(e -> { - log.error(""); + log.error("Read ledger failed. Ledger name {}, ledgerId {}, startEntry:{}, endEntry:{}.", + topicName, readHandle.getId(), startEntry, endEntry, e); + promise.completeExceptionally(e); return null; }); } - private void currentTaskFinished() { + private void processAfterTaskFinished() { if (ledgerReader.fileSystemWriteException != null) { promise.completeExceptionally(ledgerReader.fileSystemWriteException); + return; } + // If no more entries to offload, close the file and complete the promise. if (this.endEntry == readHandle.getLastAddConfirmed()) { IOUtils.closeStream(this.dataWriter); promise.complete(null); } else { - // TODO submit next task + // Submit the next task. + long startEntry = endEntry + 1; + long endEntry = Math.min(startEntry + ENTRIES_PER_READ - 1, readHandle.getLastAddConfirmed()); + this.scheduler.chooseThread(readHandle.getId()) + .execute(new LedgerPartitionedReader(this, startEntry, endEntry)); } } } diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java index 606934b5f0e36..e936a41eeca27 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java @@ -38,6 +38,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import lombok.AllArgsConstructor; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.api.ReadHandle; @@ -173,67 +174,152 @@ public Map getOffloadDriverMetadata() { * Creating indexBlocks for each corresponding DataBlock that is uploaded. */ @Override + @SuppressWarnings("UnstableApiUsage") public CompletableFuture offload(ReadHandle readHandle, UUID uuid, Map extraMetadata) { + final CompletableFuture promise = new CompletableFuture<>(); + if (null == extraMetadata) { + promise.completeExceptionally(new IllegalArgumentException("Argument [extraMetadata] can't be null.")); + return promise; + } + + final long ledgerId = readHandle.getId(); + final String bucket = config.getBucket(); final String topicName = extraMetadata.get(MANAGED_LEDGER_NAME); final BlobStore writeBlobStore = blobStores.get(config.getBlobStoreLocation()); - log.info("offload {} uuid {} extraMetadata {} to {} {}", readHandle.getId(), uuid, extraMetadata, + log.info("offload {} uuid {} extraMetadata {} to {} {}", ledgerId, uuid, extraMetadata, config.getBlobStoreLocation(), writeBlobStore); - CompletableFuture promise = new CompletableFuture<>(); - scheduler.chooseThread(readHandle.getId()).execute(() -> { - if (readHandle.getLength() == 0 || !readHandle.isClosed() || readHandle.getLastAddConfirmed() < 0) { - promise.completeExceptionally( - new IllegalArgumentException("An empty or open ledger should never be offloaded")); - return; - } - OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create() + + if (readHandle.getLength() == 0 || !readHandle.isClosed() || readHandle.getLastAddConfirmed() < 0) { + promise.completeExceptionally( + new IllegalArgumentException("An empty or open ledger should never be offloaded")); + return promise; + } + OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create() .withLedgerMetadata(readHandle.getLedgerMetadata()) .withDataBlockHeaderLength(BlockAwareSegmentInputStreamImpl.getHeaderSize()); - String dataBlockKey = DataBlockUtils.dataBlockOffloadKey(readHandle.getId(), uuid); - String indexBlockKey = DataBlockUtils.indexBlockOffloadKey(readHandle.getId(), uuid); - log.info("ledger {} dataBlockKey {} indexBlockKey {}", readHandle.getId(), dataBlockKey, indexBlockKey); + String dataBlockKey = DataBlockUtils.dataBlockOffloadKey(ledgerId, uuid); + String indexBlockKey = DataBlockUtils.indexBlockOffloadKey(ledgerId, uuid); + log.info("ledger {} dataBlockKey {} indexBlockKey {}", ledgerId, dataBlockKey, indexBlockKey); - MultipartUpload mpu = null; - List parts = Lists.newArrayList(); + MultipartUpload mpu; + List parts = Lists.newArrayList(); - // init multi part upload for data block. - try { - BlobBuilder blobBuilder = writeBlobStore.blobBuilder(dataBlockKey); + // init multi part upload for data block. + try { + BlobBuilder blobBuilder = writeBlobStore.blobBuilder(dataBlockKey); + Map objectMetadata = new HashMap<>(userMetadata); + objectMetadata.put("role", "data"); + objectMetadata.putAll(extraMetadata); + + DataBlockUtils.addVersionInfo(blobBuilder, objectMetadata); + Blob blob = blobBuilder.build(); + log.info("initiateMultipartUpload bucket {}, metadata {} ", bucket, blob.getMetadata()); + mpu = writeBlobStore.initiateMultipartUpload(bucket, blob.getMetadata(), new PutOptions()); + } catch (Throwable t) { + promise.completeExceptionally(t); + return promise; + } + + final AtomicLong dataLen = new AtomicLong(0); + final CompletableFuture f = new CompletableFuture<>(); + // Execute the first block task. + final int maxBlockSize = config.getMaxBlockSizeInBytes(); + Runnable firstTask = new BlobStorePartitionedReader(0, maxBlockSize, readHandle, writeBlobStore, mpu, + parts, indexBuilder, f, offloaderStats, topicName, bucket, dataBlockKey, dataLen); + scheduler.chooseThread(ledgerId).execute(firstTask); + + f.thenAccept(__ -> { + // upload index block + try (OffloadIndexBlock index = indexBuilder.withDataObjectLength(dataLen.get()).build(); + IndexInputStream indexStream = index.toStream()) { + // write the index block + BlobBuilder blobBuilder = writeBlobStore.blobBuilder(indexBlockKey); Map objectMetadata = new HashMap<>(userMetadata); - objectMetadata.put("role", "data"); - if (extraMetadata != null) { - objectMetadata.putAll(extraMetadata); - } + objectMetadata.put("role", "index"); + objectMetadata.putAll(extraMetadata); + DataBlockUtils.addVersionInfo(blobBuilder, objectMetadata); - Blob blob = blobBuilder.build(); - log.info("initiateMultipartUpload bucket {}, metadata {} ", config.getBucket(), blob.getMetadata()); - mpu = writeBlobStore.initiateMultipartUpload(config.getBucket(), blob.getMetadata(), new PutOptions()); + Payload indexPayload = Payloads.newInputStreamPayload(indexStream); + indexPayload.getContentMetadata().setContentLength((long) indexStream.getStreamSize()); + indexPayload.getContentMetadata().setContentType("application/octet-stream"); + + Blob blob = blobBuilder + .payload(indexPayload) + .contentLength(indexStream.getStreamSize()) + .build(); + writeBlobStore.putBlob(bucket, blob); + promise.complete(null); } catch (Throwable t) { + try { + writeBlobStore.removeBlob(bucket, dataBlockKey); + } catch (Throwable throwable) { + log.error("Failed deleteObject in bucket - {} with key - {}.", bucket, dataBlockKey, throwable); + } + + this.offloaderStats.recordWriteToStorageError(topicName); + this.offloaderStats.recordOffloadError(topicName); promise.completeExceptionally(t); - return; } + }).exceptionally(e -> { + try { + blobStore.abortMultipartUpload(mpu); + } catch (Throwable throwable) { + log.error("Failed abortMultipartUpload in bucket - {} with key - {}, uploadId - {}.", + bucket, dataBlockKey, mpu.id(), throwable); + } + this.offloaderStats.recordWriteToStorageError(topicName); + this.offloaderStats.recordOffloadError(topicName); + promise.completeExceptionally(e); + return null; + }); - long dataObjectLength = 0; + return promise; + } + + + @AllArgsConstructor + @SuppressWarnings("UnstableApiUsage") + private static final class BlobStorePartitionedReader implements Runnable { + + private long written; + private long startEntry; + private final int maxBlockSize; + private final ReadHandle handle; + private final BlobStore blobStore; + private final MultipartUpload mp; + private final List parts; + private final OffloadIndexBlockBuilder indexBuilder; + private final CompletableFuture f; + private final LedgerOffloaderStats stats; + private final String topicName; + private final String bucket; + private final String dataBlockKey; + private final AtomicLong dataLen; + + @Override + public void run() { // start multi part upload for data block. + int partId = this.parts.size() + 1; + try { long startEntry = 0; - int partId = 1; - long start = System.nanoTime(); + int partId = this.parts.size() + 1; long entryBytesWritten = 0; - while (startEntry <= readHandle.getLastAddConfirmed()) { + while (startEntry <= handle.getLastAddConfirmed()) { int blockSize = BlockAwareSegmentInputStreamImpl - .calculateBlockSize(config.getMaxBlockSizeInBytes(), readHandle, startEntry, entryBytesWritten); + .calculateBlockSize(maxBlockSize, handle, startEntry, entryBytesWritten); try (BlockAwareSegmentInputStream blockStream = new BlockAwareSegmentInputStreamImpl( - readHandle, startEntry, blockSize, this.offloaderStats, topicName)) { + handle, startEntry, blockSize, this.stats, topicName)) { Payload partPayload = Payloads.newInputStreamPayload(blockStream); partPayload.getContentMetadata().setContentLength((long) blockSize); partPayload.getContentMetadata().setContentType("application/octet-stream"); - parts.add(writeBlobStore.uploadMultipartPart(mpu, partId, partPayload)); + parts.add(blobStore.uploadMultipartPart(mp, partId, partPayload)); log.debug("UploadMultipartPart. container: {}, blobName: {}, partId: {}, mpu: {}", - config.getBucket(), dataBlockKey, partId, mpu.id()); + bucket, dataBlockKey, partId, mp.id()); indexBuilder.addBlock(startEntry, partId, blockSize); @@ -244,67 +330,32 @@ public CompletableFuture offload(ReadHandle readHandle, break; } entryBytesWritten += blockStream.getBlockEntryBytesCount(); - partId++; - this.offloaderStats.recordOffloadBytes(topicName, blockStream.getBlockEntryBytesCount()); + this.stats.recordOffloadBytes(topicName, blockStream.getBlockEntryBytesCount()); } - dataObjectLength += blockSize; + this.dataLen.addAndGet(blockSize); } - String etag = writeBlobStore.completeMultipartUpload(mpu, parts); - log.info("Ledger {}, upload finished, etag {}", readHandle.getId(), etag); - mpu = null; + String etag = blobStore.completeMultipartUpload(mp, parts); + log.info("Ledger {}, upload finished, etag {}", handle.getId(), etag); } catch (Throwable t) { try { - if (mpu != null) { - writeBlobStore.abortMultipartUpload(mpu); + if (mp != null) { + blobStore.abortMultipartUpload(mp); } } catch (Throwable throwable) { log.error("Failed abortMultipartUpload in bucket - {} with key - {}, uploadId - {}.", - config.getBucket(), dataBlockKey, mpu.id(), throwable); + bucket, dataBlockKey, mp.id(), throwable); } - this.offloaderStats.recordWriteToStorageError(topicName); - this.offloaderStats.recordOffloadError(topicName); - promise.completeExceptionally(t); - return; + this.stats.recordWriteToStorageError(topicName); + this.stats.recordOffloadError(topicName); + f.completeExceptionally(t); } + } - // upload index block - try (OffloadIndexBlock index = indexBuilder.withDataObjectLength(dataObjectLength).build(); - IndexInputStream indexStream = index.toStream()) { - // write the index block - BlobBuilder blobBuilder = writeBlobStore.blobBuilder(indexBlockKey); - Map objectMetadata = new HashMap<>(userMetadata); - objectMetadata.put("role", "index"); - if (extraMetadata != null) { - objectMetadata.putAll(extraMetadata); - } - DataBlockUtils.addVersionInfo(blobBuilder, objectMetadata); - Payload indexPayload = Payloads.newInputStreamPayload(indexStream); - indexPayload.getContentMetadata().setContentLength((long) indexStream.getStreamSize()); - indexPayload.getContentMetadata().setContentType("application/octet-stream"); - - Blob blob = blobBuilder - .payload(indexPayload) - .contentLength((long) indexStream.getStreamSize()) - .build(); - writeBlobStore.putBlob(config.getBucket(), blob); - promise.complete(null); - } catch (Throwable t) { - try { - writeBlobStore.removeBlob(config.getBucket(), dataBlockKey); - } catch (Throwable throwable) { - log.error("Failed deleteObject in bucket - {} with key - {}.", - config.getBucket(), dataBlockKey, throwable); - } + private void processAfterTaskFinished() { - this.offloaderStats.recordWriteToStorageError(topicName); - this.offloaderStats.recordOffloadError(topicName); - promise.completeExceptionally(t); - return; - } - }); - return promise; + } } BlobStore blobStore; From edf22d4f53bc40ed50b0ddf3009a1b3236d0de17 Mon Sep 17 00:00:00 2001 From: daojun Date: Tue, 11 Oct 2022 13:22:01 +0800 Subject: [PATCH 08/15] Complete BlobStoreManagedLedgerOffloader Add configurations --- .../mledger/ManagedLedgerConfig.java | 50 +++++++- .../bookkeeper/mledger/OffloadReadHandle.java | 93 +++++++++----- .../mledger/impl/ManagedLedgerImpl.java | 3 +- .../impl/BlobStoreManagedLedgerOffloader.java | 119 +++++++++++------- 4 files changed, 181 insertions(+), 84 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index 7487f32f72c68..c4cf41d3707e9 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -84,7 +84,9 @@ public class ManagedLedgerConfig { private int minimumBacklogCursorsForCaching = 0; private int minimumBacklogEntriesForCaching = 1000; private int maxBacklogBetweenCursorsForCaching = 1000; - private long globalOffloadingPermitBytesPerSecond = 0; + private Long managedLedgerOffloadBrokerFlowPermit; + private Map managedLedgerOffloadNamespaceFlowPermit; + private Map managedLedgerOffloadTopicFlowPermit; public boolean isCreateIfMissing() { return createIfMissing; @@ -747,10 +749,10 @@ public void setMaxBacklogBetweenCursorsForCaching(int maxBacklogBetweenCursorsFo /** * Set permitted size to offload on the broker. * - * @param globalOffloadingPermitBytesPerSecond + * @param managedLedgerOffloadBrokerFlowPermit */ - public void setGlobalOffloadingPermitBytesPerSecond(long globalOffloadingPermitBytesPerSecond) { - this.globalOffloadingPermitBytesPerSecond = globalOffloadingPermitBytesPerSecond; + public void setGlobalOffloadingPermitBytesPerSecond(long managedLedgerOffloadBrokerFlowPermit) { + this.managedLedgerOffloadBrokerFlowPermit = managedLedgerOffloadBrokerFlowPermit; } /** @@ -758,7 +760,43 @@ public void setGlobalOffloadingPermitBytesPerSecond(long globalOffloadingPermitB * * @return */ - public long getGlobalOffloadingPermitBytesPerSecond() { - return globalOffloadingPermitBytesPerSecond; + public long getManagedLedgerOffloadBrokerFlowPermit() { + return managedLedgerOffloadBrokerFlowPermit; + } + + /** + * Set permitted size to offload on the broker. + * + * @param managedLedgerOffloadNamespaceFlowPermit + */ + public void setManagedLedgerOffloadNamespaceFlowPermit(Map managedLedgerOffloadNamespaceFlowPermit) { + this.managedLedgerOffloadNamespaceFlowPermit = managedLedgerOffloadNamespaceFlowPermit; + } + + /** + * Get permitted size to offload on the broker. + * + * @return + */ + public Map getManagedLedgerOffloadNamespaceFlowPermit() { + return managedLedgerOffloadNamespaceFlowPermit; + } + + /** + * Set permitted size to offload on the broker. + * + * @param managedLedgerOffloadTopicFlowPermit + */ + public void setManagedLedgerOffloadTopicFlowPermit(Map managedLedgerOffloadTopicFlowPermit) { + this.managedLedgerOffloadTopicFlowPermit = managedLedgerOffloadTopicFlowPermit; + } + + /** + * Get permitted size to offload on the broker. + * + * @return + */ + public Map getManagedLedgerOffloadTopicFlowPermit() { + return managedLedgerOffloadTopicFlowPermit; } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java index 748374bf261e8..1bf967077662a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java @@ -18,13 +18,13 @@ */ package org.apache.bookkeeper.mledger; +import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.LockSupport; import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerMetadata; @@ -32,45 +32,80 @@ import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.mledger.util.TimeWindow; import org.apache.bookkeeper.mledger.util.WindowWrap; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.common.naming.TopicName; public class OffloadReadHandle implements ReadHandle { private static final AtomicBoolean INITIALIZED = new AtomicBoolean(false); - private static volatile long permittedBytes0; - private static volatile TimeWindow window0; - private static final int MAX_PROCESSING_CMDS = 1024; - private static final AtomicInteger PROCESSING_CMDS = new AtomicInteger(0); + private static volatile Long brokerFlowPermit; + private static volatile Map namespaceFlowPermit; + private static volatile Map topicFlowPermit; + private static volatile TimeWindow brokerFlowPermit0; + private static volatile Map> namespaceFlowPermit0; + private static volatile Map> topicFlowPermit0; + + private final String ledgerName; + private final String namespace; private final ReadHandle delegator; private final OrderedScheduler scheduler; + private final long ledgerFlowPermit; + private final TimeWindow window; - private OffloadReadHandle(ReadHandle handle, long permittedBytes, OrderedScheduler scheduler) { + private OffloadReadHandle(ReadHandle handle, String ledgerName, ManagedLedgerConfig config, + OrderedScheduler scheduler) { + initialize(config); this.delegator = handle; + this.ledgerName = ledgerName; + this.namespace = TopicName.get(ledgerName).getNamespace(); + this.scheduler = Objects.requireNonNull(scheduler); + + Pair> pair = getFlowPermitAndController(ledgerName, namespace); + this.ledgerFlowPermit = pair.getLeft(); + this.window = pair.getRight(); + } + + private static void initialize(ManagedLedgerConfig config) { if (INITIALIZED.compareAndSet(false, true)) { - permittedBytes0 = permittedBytes; - window0 = new TimeWindow<>(2, Long.valueOf(TimeUnit.SECONDS.toMillis(1)).intValue()); + brokerFlowPermit = config.getManagedLedgerOffloadBrokerFlowPermit(); + namespaceFlowPermit = config.getManagedLedgerOffloadNamespaceFlowPermit(); + topicFlowPermit = config.getManagedLedgerOffloadTopicFlowPermit(); + + brokerFlowPermit0 = new TimeWindow<>(2, 1000); + namespaceFlowPermit0 = new ConcurrentHashMap<>(); + topicFlowPermit0 = new ConcurrentHashMap<>(); } + } - this.scheduler = Objects.requireNonNull(scheduler); + private static Pair> getFlowPermitAndController(String ledgerName, String namespace) { + if (null != topicFlowPermit.get(ledgerName)) { + long permit = topicFlowPermit.get(ledgerName); + TimeWindow window = + topicFlowPermit0.computeIfAbsent(ledgerName, __ -> new TimeWindow<>(2, 1000)); + return Pair.of(permit, window); + } else if (null != namespaceFlowPermit.get(namespace)) { + long permit = namespaceFlowPermit.get(namespace); + TimeWindow window = + namespaceFlowPermit0.computeIfAbsent(namespace, __ -> new TimeWindow<>(2, 1000)); + return Pair.of(permit, window); + } else { + return Pair.of(brokerFlowPermit, brokerFlowPermit0); + } } - public static CompletableFuture create(ReadHandle handle, long permitBytes, OrderedScheduler scheduler) { - return CompletableFuture.completedFuture(new OffloadReadHandle(handle, permitBytes, scheduler)); + public static CompletableFuture create(ReadHandle handle, String ledgerName, + ManagedLedgerConfig config, OrderedScheduler scheduler) { + return CompletableFuture.completedFuture(new OffloadReadHandle(handle, ledgerName, config, scheduler)); } @Override public CompletableFuture readAsync(long firstEntry, long lastEntry) { final long delayMills = calculateDelayMillis(); if (delayMills > 0) { - if (PROCESSING_CMDS.get() > MAX_PROCESSING_CMDS) { - LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(delayMills)); - return readAsync(firstEntry, lastEntry); - } else { - CompletableFuture f = new CompletableFuture<>(); - Runnable cmd = new ReadAsyncCommand(firstEntry, lastEntry, f); - scheduler.schedule(cmd, delayMills, TimeUnit.MILLISECONDS); - PROCESSING_CMDS.incrementAndGet(); - return f; - } + CompletableFuture f = new CompletableFuture<>(); + Runnable cmd = new ReadAsyncCommand(firstEntry, lastEntry, f); + scheduler.schedule(cmd, delayMills, TimeUnit.MILLISECONDS); + return f; } return this.delegator @@ -134,18 +169,18 @@ public LedgerMetadata getLedgerMetadata() { } - private static long calculateDelayMillis() { - if (permittedBytes0 <= 0) { + private long calculateDelayMillis() { + if (ledgerFlowPermit <= 0) { return 0; } - WindowWrap wrap = window0.current(__ -> new AtomicLong(0)); + WindowWrap wrap = window.current(__ -> new AtomicLong(0)); if (wrap == null) { // it should never goes here return 0; } - if (wrap.value().get() >= permittedBytes0) { + if (wrap.value().get() >= ledgerFlowPermit) { // park until next window start long end = wrap.start() + wrap.interval(); return end - System.currentTimeMillis(); @@ -154,8 +189,8 @@ private static long calculateDelayMillis() { return 0; } - private static void recordReadBytes(LedgerEntries entries) { - if (permittedBytes0 <= 0) { + private void recordReadBytes(LedgerEntries entries) { + if (ledgerFlowPermit <= 0) { return; } @@ -166,7 +201,7 @@ private static void recordReadBytes(LedgerEntries entries) { AtomicLong num = new AtomicLong(0); entries.forEach(en -> num.addAndGet(en.getLength())); - WindowWrap wrap = window0.current(__ -> new AtomicLong(0)); + WindowWrap wrap = window.current(__ -> new AtomicLong(0)); if (wrap == null) { // it should never goes here return; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index a31f7cda58d09..bde9b8083df37 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2990,12 +2990,11 @@ private void offloadLoop(CompletableFuture promise, Queue extraMetadata = Map.of("ManagedLedgerName", name); String driverName = config.getLedgerOffloader().getOffloadDriverName(); - final long flowPermits = config.getGlobalOffloadingPermitBytesPerSecond(); Map driverMetadata = config.getLedgerOffloader().getOffloadDriverMetadata(); prepareLedgerInfoForOffloaded(ledgerId, uuid, driverName, driverMetadata) .thenCompose((ignore) -> getLedgerHandle(ledgerId)) - .thenCompose(handle -> OffloadReadHandle.create(handle, flowPermits)) + .thenCompose(handle -> OffloadReadHandle.create(handle, name, config, scheduledExecutor)) .thenCompose(readHandle -> config.getLedgerOffloader().offload(readHandle, uuid, extraMetadata)) .thenCompose((ignore) -> { return Retries.run(Backoff.exponentialJittered(TimeUnit.SECONDS.toMillis(1), diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java index e936a41eeca27..ac5bb28f65c8b 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java @@ -38,7 +38,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import lombok.AllArgsConstructor; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.api.ReadHandle; @@ -226,8 +225,8 @@ public CompletableFuture offload(ReadHandle readHandle, final CompletableFuture f = new CompletableFuture<>(); // Execute the first block task. final int maxBlockSize = config.getMaxBlockSizeInBytes(); - Runnable firstTask = new BlobStorePartitionedReader(0, maxBlockSize, readHandle, writeBlobStore, mpu, - parts, indexBuilder, f, offloaderStats, topicName, bucket, dataBlockKey, dataLen); + Runnable firstTask = new BlobStorePartitionedReader(maxBlockSize, readHandle, writeBlobStore, mpu, + parts, indexBuilder, f, offloaderStats, topicName, bucket, dataBlockKey, dataLen, scheduler); scheduler.chooseThread(ledgerId).execute(firstTask); f.thenAccept(__ -> { @@ -279,7 +278,6 @@ public CompletableFuture offload(ReadHandle readHandle, } - @AllArgsConstructor @SuppressWarnings("UnstableApiUsage") private static final class BlobStorePartitionedReader implements Runnable { @@ -297,64 +295,91 @@ private static final class BlobStorePartitionedReader implements Runnable { private final String bucket; private final String dataBlockKey; private final AtomicLong dataLen; + private final OrderedScheduler scheduler; + + BlobStorePartitionedReader(int maxBlockSize, ReadHandle handle, BlobStore blobStore, MultipartUpload mp, + List parts, OffloadIndexBlockBuilder indexBuilder, + CompletableFuture f, LedgerOffloaderStats stats, String topicName, + String bucket, String dataBlockKey, AtomicLong dataLen, + OrderedScheduler scheduler) { + this.startEntry = 0; + this.written = 0; + this.maxBlockSize = maxBlockSize; + this.handle = handle; + this.blobStore = blobStore; + this.mp = mp; + this.parts = parts; + this.indexBuilder = indexBuilder; + this.f = f; + this.stats = stats; + this.topicName = topicName; + this.bucket = bucket; + this.dataBlockKey = dataBlockKey; + this.dataLen = dataLen; + this.scheduler = scheduler; + } + + BlobStorePartitionedReader(BlobStorePartitionedReader r) { + this.startEntry = r.startEntry; + this.written = r.written; + this.maxBlockSize = r.maxBlockSize; + this.handle = r.handle; + this.blobStore = r.blobStore; + this.mp = r.mp; + this.parts = r.parts; + this.indexBuilder = r.indexBuilder; + this.f = r.f; + this.stats = r.stats; + this.topicName = r.topicName; + this.bucket = r.bucket; + this.dataBlockKey = r.dataBlockKey; + this.dataLen = r.dataLen; + this.scheduler = r.scheduler; + } @Override public void run() { // start multi part upload for data block. int partId = this.parts.size() + 1; - try { - long startEntry = 0; - int partId = this.parts.size() + 1; - long entryBytesWritten = 0; - while (startEntry <= handle.getLastAddConfirmed()) { - int blockSize = BlockAwareSegmentInputStreamImpl - .calculateBlockSize(maxBlockSize, handle, startEntry, entryBytesWritten); - - try (BlockAwareSegmentInputStream blockStream = new BlockAwareSegmentInputStreamImpl( - handle, startEntry, blockSize, this.stats, topicName)) { - - Payload partPayload = Payloads.newInputStreamPayload(blockStream); - partPayload.getContentMetadata().setContentLength((long) blockSize); - partPayload.getContentMetadata().setContentType("application/octet-stream"); - parts.add(blobStore.uploadMultipartPart(mp, partId, partPayload)); - log.debug("UploadMultipartPart. container: {}, blobName: {}, partId: {}, mpu: {}", - bucket, dataBlockKey, partId, mp.id()); - - indexBuilder.addBlock(startEntry, partId, blockSize); - - if (blockStream.getEndEntryId() != -1) { - startEntry = blockStream.getEndEntryId() + 1; - } else { - // could not read entry from ledger. - break; - } - entryBytesWritten += blockStream.getBlockEntryBytesCount(); - this.stats.recordOffloadBytes(topicName, blockStream.getBlockEntryBytesCount()); - } + int blockSize = BlockAwareSegmentInputStreamImpl + .calculateBlockSize(maxBlockSize, handle, startEntry, written); - this.dataLen.addAndGet(blockSize); - } + try (BlockAwareSegmentInputStream blockStream = new BlockAwareSegmentInputStreamImpl( + handle, startEntry, blockSize, this.stats, topicName)) { - String etag = blobStore.completeMultipartUpload(mp, parts); - log.info("Ledger {}, upload finished, etag {}", handle.getId(), etag); - } catch (Throwable t) { - try { - if (mp != null) { - blobStore.abortMultipartUpload(mp); + Payload partPayload = Payloads.newInputStreamPayload(blockStream); + partPayload.getContentMetadata().setContentLength((long) blockSize); + partPayload.getContentMetadata().setContentType("application/octet-stream"); + parts.add(blobStore.uploadMultipartPart(mp, partId, partPayload)); + log.debug("UploadMultipartPart. container: {}, blobName: {}, partId: {}, mpu: {}", + bucket, dataBlockKey, partId, mp.id()); + + indexBuilder.addBlock(startEntry, partId, blockSize); + + if (blockStream.getEndEntryId() != -1) { + startEntry = blockStream.getEndEntryId() + 1; + written += blockStream.getBlockEntryBytesCount(); + this.stats.recordOffloadBytes(topicName, blockStream.getBlockEntryBytesCount()); + this.dataLen.addAndGet(blockSize); } - } catch (Throwable throwable) { - log.error("Failed abortMultipartUpload in bucket - {} with key - {}, uploadId - {}.", - bucket, dataBlockKey, mp.id(), throwable); } - this.stats.recordWriteToStorageError(topicName); - this.stats.recordOffloadError(topicName); + this.processAfterTaskFinished(); + } catch (Throwable t) { + log.error("Blob store offload failed. LedgerId {}, LedgerName {}, PartId {}, StartEntry {}.", + handle.getId(), topicName, partId, startEntry, t); f.completeExceptionally(t); } } private void processAfterTaskFinished() { - + if (!(startEntry <= handle.getLastAddConfirmed())) { + String etag = blobStore.completeMultipartUpload(mp, parts); + log.info("Ledger {}, upload finished, etag {}", handle.getId(), etag); + f.complete(null); + } else { + scheduler.chooseThread(handle.getId()).execute(new BlobStorePartitionedReader(this)); + } } } From 0bbc195ffb860f2d604bad289e99f9f2e7c9d4a6 Mon Sep 17 00:00:00 2001 From: daojun Date: Thu, 24 Nov 2022 16:27:31 +0800 Subject: [PATCH 09/15] FIX --- .../mledger/ManagedLedgerConfig.java | 37 ----------- .../bookkeeper/mledger/OffloadReadHandle.java | 66 ++++--------------- .../mledger/impl/ManagedLedgerImpl.java | 2 +- .../mledger/OffloadReadHandleTest.java | 7 +- 4 files changed, 20 insertions(+), 92 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index c4cf41d3707e9..a49e57a50c793 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -85,8 +85,6 @@ public class ManagedLedgerConfig { private int minimumBacklogEntriesForCaching = 1000; private int maxBacklogBetweenCursorsForCaching = 1000; private Long managedLedgerOffloadBrokerFlowPermit; - private Map managedLedgerOffloadNamespaceFlowPermit; - private Map managedLedgerOffloadTopicFlowPermit; public boolean isCreateIfMissing() { return createIfMissing; @@ -764,39 +762,4 @@ public long getManagedLedgerOffloadBrokerFlowPermit() { return managedLedgerOffloadBrokerFlowPermit; } - /** - * Set permitted size to offload on the broker. - * - * @param managedLedgerOffloadNamespaceFlowPermit - */ - public void setManagedLedgerOffloadNamespaceFlowPermit(Map managedLedgerOffloadNamespaceFlowPermit) { - this.managedLedgerOffloadNamespaceFlowPermit = managedLedgerOffloadNamespaceFlowPermit; - } - - /** - * Get permitted size to offload on the broker. - * - * @return - */ - public Map getManagedLedgerOffloadNamespaceFlowPermit() { - return managedLedgerOffloadNamespaceFlowPermit; - } - - /** - * Set permitted size to offload on the broker. - * - * @param managedLedgerOffloadTopicFlowPermit - */ - public void setManagedLedgerOffloadTopicFlowPermit(Map managedLedgerOffloadTopicFlowPermit) { - this.managedLedgerOffloadTopicFlowPermit = managedLedgerOffloadTopicFlowPermit; - } - - /** - * Get permitted size to offload on the broker. - * - * @return - */ - public Map getManagedLedgerOffloadTopicFlowPermit() { - return managedLedgerOffloadTopicFlowPermit; - } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java index 1bf967077662a..0f04c68163151 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java @@ -18,10 +18,8 @@ */ package org.apache.bookkeeper.mledger; -import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -32,70 +30,32 @@ import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.mledger.util.TimeWindow; import org.apache.bookkeeper.mledger.util.WindowWrap; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.pulsar.common.naming.TopicName; public class OffloadReadHandle implements ReadHandle { private static final AtomicBoolean INITIALIZED = new AtomicBoolean(false); - private static volatile Long brokerFlowPermit; - private static volatile Map namespaceFlowPermit; - private static volatile Map topicFlowPermit; - private static volatile TimeWindow brokerFlowPermit0; - private static volatile Map> namespaceFlowPermit0; - private static volatile Map> topicFlowPermit0; + private static volatile Long flowPermits; + private static volatile TimeWindow flowPermitsWindow; - - private final String ledgerName; - private final String namespace; private final ReadHandle delegator; private final OrderedScheduler scheduler; - private final long ledgerFlowPermit; - private final TimeWindow window; - private OffloadReadHandle(ReadHandle handle, String ledgerName, ManagedLedgerConfig config, + private OffloadReadHandle(ReadHandle handle, ManagedLedgerConfig config, OrderedScheduler scheduler) { initialize(config); this.delegator = handle; - this.ledgerName = ledgerName; - this.namespace = TopicName.get(ledgerName).getNamespace(); this.scheduler = Objects.requireNonNull(scheduler); - - Pair> pair = getFlowPermitAndController(ledgerName, namespace); - this.ledgerFlowPermit = pair.getLeft(); - this.window = pair.getRight(); } private static void initialize(ManagedLedgerConfig config) { if (INITIALIZED.compareAndSet(false, true)) { - brokerFlowPermit = config.getManagedLedgerOffloadBrokerFlowPermit(); - namespaceFlowPermit = config.getManagedLedgerOffloadNamespaceFlowPermit(); - topicFlowPermit = config.getManagedLedgerOffloadTopicFlowPermit(); - - brokerFlowPermit0 = new TimeWindow<>(2, 1000); - namespaceFlowPermit0 = new ConcurrentHashMap<>(); - topicFlowPermit0 = new ConcurrentHashMap<>(); - } - } - - private static Pair> getFlowPermitAndController(String ledgerName, String namespace) { - if (null != topicFlowPermit.get(ledgerName)) { - long permit = topicFlowPermit.get(ledgerName); - TimeWindow window = - topicFlowPermit0.computeIfAbsent(ledgerName, __ -> new TimeWindow<>(2, 1000)); - return Pair.of(permit, window); - } else if (null != namespaceFlowPermit.get(namespace)) { - long permit = namespaceFlowPermit.get(namespace); - TimeWindow window = - namespaceFlowPermit0.computeIfAbsent(namespace, __ -> new TimeWindow<>(2, 1000)); - return Pair.of(permit, window); - } else { - return Pair.of(brokerFlowPermit, brokerFlowPermit0); + flowPermits = config.getManagedLedgerOffloadBrokerFlowPermit(); + flowPermitsWindow = new TimeWindow<>(2, 1000); } } - public static CompletableFuture create(ReadHandle handle, String ledgerName, - ManagedLedgerConfig config, OrderedScheduler scheduler) { - return CompletableFuture.completedFuture(new OffloadReadHandle(handle, ledgerName, config, scheduler)); + public static CompletableFuture create(ReadHandle handle, ManagedLedgerConfig config, + OrderedScheduler scheduler) { + return CompletableFuture.completedFuture(new OffloadReadHandle(handle, config, scheduler)); } @Override @@ -170,17 +130,17 @@ public LedgerMetadata getLedgerMetadata() { private long calculateDelayMillis() { - if (ledgerFlowPermit <= 0) { + if (flowPermits <= 0) { return 0; } - WindowWrap wrap = window.current(__ -> new AtomicLong(0)); + WindowWrap wrap = flowPermitsWindow.current(__ -> new AtomicLong(0)); if (wrap == null) { // it should never goes here return 0; } - if (wrap.value().get() >= ledgerFlowPermit) { + if (wrap.value().get() >= flowPermits) { // park until next window start long end = wrap.start() + wrap.interval(); return end - System.currentTimeMillis(); @@ -190,7 +150,7 @@ private long calculateDelayMillis() { } private void recordReadBytes(LedgerEntries entries) { - if (ledgerFlowPermit <= 0) { + if (flowPermits <= 0) { return; } @@ -201,7 +161,7 @@ private void recordReadBytes(LedgerEntries entries) { AtomicLong num = new AtomicLong(0); entries.forEach(en -> num.addAndGet(en.getLength())); - WindowWrap wrap = window.current(__ -> new AtomicLong(0)); + WindowWrap wrap = flowPermitsWindow.current(__ -> new AtomicLong(0)); if (wrap == null) { // it should never goes here return; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index bde9b8083df37..bf60c8d72f276 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2994,7 +2994,7 @@ private void offloadLoop(CompletableFuture promise, Queue getLedgerHandle(ledgerId)) - .thenCompose(handle -> OffloadReadHandle.create(handle, name, config, scheduledExecutor)) + .thenCompose(handle -> OffloadReadHandle.create(handle, config, scheduledExecutor)) .thenCompose(readHandle -> config.getLedgerOffloader().offload(readHandle, uuid, extraMetadata)) .thenCompose((ignore) -> { return Retries.run(Backoff.exponentialJittered(TimeUnit.SECONDS.toMillis(1), diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/OffloadReadHandleTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/OffloadReadHandleTest.java index 46c684fea0910..16be66f209ad0 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/OffloadReadHandleTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/OffloadReadHandleTest.java @@ -28,6 +28,7 @@ import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.client.impl.LedgerEntriesImpl; import org.apache.bookkeeper.client.impl.LedgerEntryImpl; +import org.apache.bookkeeper.common.util.OrderedScheduler; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.Test; @@ -47,7 +48,11 @@ public void test() throws Exception { Mockito.doAnswer(inv -> CompletableFuture.completedFuture(entries)).when(handle).readAsync(1, 1); long start = System.currentTimeMillis(); - CompletableFuture future = OffloadReadHandle.create(handle, 100); + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setGlobalOffloadingPermitBytesPerSecond(100L); + + CompletableFuture future = OffloadReadHandle.create(handle, config, + OrderedScheduler.newSchedulerBuilder().numThreads(2).build()); ReadHandle h = future.get(); h.readAsync(1, 1); h.readAsync(1, 1); From 8ca776c58afd154302c6464129e8e60246daa8d1 Mon Sep 17 00:00:00 2001 From: daojun Date: Thu, 24 Nov 2022 17:09:46 +0800 Subject: [PATCH 10/15] FIX --- .../java/org/apache/bookkeeper/mledger/OffloadReadHandle.java | 2 +- .../org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 2 +- .../org/apache/bookkeeper/mledger/OffloadReadHandleTest.java | 2 +- .../filesystem/impl/FileSystemManagedLedgerOffloader.java | 3 ++- 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java index 0f04c68163151..99b4b614970ee 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 3d4227d77b0a5..7926f339b6832 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -94,7 +94,6 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.UpdatePropertiesCallback; import org.apache.bookkeeper.mledger.Entry; -import org.apache.bookkeeper.mledger.OffloadReadHandle; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; @@ -113,6 +112,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException; import org.apache.bookkeeper.mledger.ManagedLedgerMXBean; +import org.apache.bookkeeper.mledger.OffloadReadHandle; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.WaitingEntryCallBack; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/OffloadReadHandleTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/OffloadReadHandleTest.java index 16be66f209ad0..9dc848c795563 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/OffloadReadHandleTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/OffloadReadHandleTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java index 143552df6c1f6..1704dd40b9ad3 100644 --- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java +++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java @@ -25,7 +25,8 @@ import java.io.IOException; import java.util.Map; import java.util.UUID; -import java.util.concurrent.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.ReadHandle; From f3266cb6f6b66a13550ac2a42768085484500f33 Mon Sep 17 00:00:00 2001 From: daojun Date: Thu, 24 Nov 2022 17:21:17 +0800 Subject: [PATCH 11/15] FIX --- .../mledger/ManagedLedgerConfig.java | 10 +- .../bookkeeper/mledger/OffloadReadHandle.java | 10 +- .../mledger/OffloadReadHandleTest.java | 2 +- .../pulsar/broker/ServiceConfiguration.java | 4 +- .../pulsar/broker/service/BrokerService.java | 4 +- .../FileSystemManagedLedgerOffloader.java | 156 ++++------ .../impl/BlobStoreManagedLedgerOffloader.java | 274 +++++++----------- 7 files changed, 176 insertions(+), 284 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index 581ed8ac2c347..d9130b08e211b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -85,7 +85,7 @@ public class ManagedLedgerConfig { private int minimumBacklogCursorsForCaching = 0; private int minimumBacklogEntriesForCaching = 1000; private int maxBacklogBetweenCursorsForCaching = 1000; - private Long managedLedgerOffloadBrokerFlowPermit; + private Long managedLedgerOffloadFlowPermitsPerSecond; public boolean isCreateIfMissing() { return createIfMissing; @@ -756,8 +756,8 @@ public String getShadowSource() { * * @param managedLedgerOffloadBrokerFlowPermit */ - public void setGlobalOffloadingPermitBytesPerSecond(long managedLedgerOffloadBrokerFlowPermit) { - this.managedLedgerOffloadBrokerFlowPermit = managedLedgerOffloadBrokerFlowPermit; + public void setManagedLedgerOffloadFlowPermitsPerSecond(long managedLedgerOffloadBrokerFlowPermit) { + this.managedLedgerOffloadFlowPermitsPerSecond = managedLedgerOffloadBrokerFlowPermit; } /** @@ -765,8 +765,8 @@ public void setGlobalOffloadingPermitBytesPerSecond(long managedLedgerOffloadBro * * @return */ - public long getManagedLedgerOffloadBrokerFlowPermit() { - return managedLedgerOffloadBrokerFlowPermit; + public long getManagedLedgerOffloadFlowPermitsPerSecond() { + return managedLedgerOffloadFlowPermitsPerSecond; } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java index 99b4b614970ee..5b57cc5701e61 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java @@ -34,7 +34,7 @@ public class OffloadReadHandle implements ReadHandle { private static final AtomicBoolean INITIALIZED = new AtomicBoolean(false); private static volatile Long flowPermits; - private static volatile TimeWindow flowPermitsWindow; + private static volatile TimeWindow window; private final ReadHandle delegator; private final OrderedScheduler scheduler; @@ -48,8 +48,8 @@ private OffloadReadHandle(ReadHandle handle, ManagedLedgerConfig config, private static void initialize(ManagedLedgerConfig config) { if (INITIALIZED.compareAndSet(false, true)) { - flowPermits = config.getManagedLedgerOffloadBrokerFlowPermit(); - flowPermitsWindow = new TimeWindow<>(2, 1000); + flowPermits = config.getManagedLedgerOffloadFlowPermitsPerSecond(); + window = new TimeWindow<>(2, 1000); } } @@ -134,7 +134,7 @@ private long calculateDelayMillis() { return 0; } - WindowWrap wrap = flowPermitsWindow.current(__ -> new AtomicLong(0)); + WindowWrap wrap = window.current(__ -> new AtomicLong(0)); if (wrap == null) { // it should never goes here return 0; @@ -161,7 +161,7 @@ private void recordReadBytes(LedgerEntries entries) { AtomicLong num = new AtomicLong(0); entries.forEach(en -> num.addAndGet(en.getLength())); - WindowWrap wrap = flowPermitsWindow.current(__ -> new AtomicLong(0)); + WindowWrap wrap = window.current(__ -> new AtomicLong(0)); if (wrap == null) { // it should never goes here return; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/OffloadReadHandleTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/OffloadReadHandleTest.java index 9dc848c795563..933bf1417b1f7 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/OffloadReadHandleTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/OffloadReadHandleTest.java @@ -49,7 +49,7 @@ public void test() throws Exception { long start = System.currentTimeMillis(); ManagedLedgerConfig config = new ManagedLedgerConfig(); - config.setGlobalOffloadingPermitBytesPerSecond(100L); + config.setManagedLedgerOffloadFlowPermitsPerSecond(100L); CompletableFuture future = OffloadReadHandle.create(handle, config, OrderedScheduler.newSchedulerBuilder().numThreads(2).build()); diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index d9fd1fef91b04..507c3ab3908e8 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1976,9 +1976,9 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se private long managedLedgerOffloadThresholdInSeconds = -1L; @FieldContext( category = CATEGORY_STORAGE_OFFLOADING, - doc = "The number of bytes permitted to offload on this broker" + doc = "The number of bytes permitted per second to offload on this broker" ) - private long managedLedgerGlobalOffloadingPermitBytesPerSecond = 0L; + private long managedLedgerOffloadFlowPermitsPerSecond = -1; @FieldContext( category = CATEGORY_STORAGE_ML, doc = "Max number of entries to append to a cursor ledger" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 20eeee48b017e..acd2027db21b8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1750,8 +1750,8 @@ public CompletableFuture getManagedLedgerConfig(TopicName t serviceConfig.getManagedLedgerMinimumBacklogEntriesForCaching()); managedLedgerConfig.setMaxBacklogBetweenCursorsForCaching( serviceConfig.getManagedLedgerMaxBacklogBetweenCursorsForCaching()); - managedLedgerConfig.setGlobalOffloadingPermitBytesPerSecond( - serviceConfig.getManagedLedgerGlobalOffloadingPermitBytesPerSecond()); + managedLedgerConfig.setManagedLedgerOffloadFlowPermitsPerSecond( + serviceConfig.getManagedLedgerOffloadFlowPermitsPerSecond()); OffloadPoliciesImpl nsLevelOffloadPolicies = (OffloadPoliciesImpl) policies.map(p -> p.offload_policies).orElse(null); diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java index 1704dd40b9ad3..56796ec8a310f 100644 --- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java +++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java @@ -23,10 +23,14 @@ import com.google.common.collect.ImmutableMap; import io.netty.util.Recycler; import java.io.IOException; +import java.util.Iterator; import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.ReadHandle; @@ -140,14 +144,15 @@ public Map getOffloadDriverMetadata() { } /* - * ledgerMetadata stored in an index of -1 - * */ + * ledgerMetadata stored in an index of -1 + * */ @Override public CompletableFuture offload(ReadHandle readHandle, UUID uuid, Map extraMetadata) { CompletableFuture promise = new CompletableFuture<>(); scheduler.chooseThread(readHandle.getId()).execute( new LedgerReader(readHandle, uuid, extraMetadata, promise, storageBasePath, configuration, - assignmentScheduler, this.offloaderStats)); + assignmentScheduler, offloadPolicies.getManagedLedgerOffloadPrefetchRounds(), + this.offloaderStats)); return promise; } @@ -160,7 +165,8 @@ private static class LedgerReader implements Runnable { private final String storageBasePath; private final Configuration configuration; volatile Exception fileSystemWriteException = null; - private final OrderedScheduler assignmentScheduler; + private OrderedScheduler assignmentScheduler; + private int managedLedgerOffloadPrefetchRounds = 1; private final LedgerOffloaderStats offloaderStats; private LedgerReader(ReadHandle readHandle, @@ -170,6 +176,7 @@ private LedgerReader(ReadHandle readHandle, String storageBasePath, Configuration configuration, OrderedScheduler assignmentScheduler, + int managedLedgerOffloadPrefetchRounds, LedgerOffloaderStats offloaderStats) { this.readHandle = readHandle; this.uuid = uuid; @@ -178,6 +185,7 @@ private LedgerReader(ReadHandle readHandle, this.storageBasePath = storageBasePath; this.configuration = configuration; this.assignmentScheduler = assignmentScheduler; + this.managedLedgerOffloadPrefetchRounds = managedLedgerOffloadPrefetchRounds; this.offloaderStats = offloaderStats; } @@ -204,102 +212,45 @@ public void run() { byte[] ledgerMetadata = buildLedgerMetadataFormat(readHandle.getLedgerMetadata()); value.set(ledgerMetadata, 0, ledgerMetadata.length); dataWriter.append(key, value); - long start = 0; - long end = Math.min(start + ENTRIES_PER_READ - 1, readHandle.getLastAddConfirmed()); - - // Submit the first LedgerPartitionedReader task. - assignmentScheduler.chooseThread(ledgerId) - .execute(new LedgerPartitionedReader(this, dataWriter, assignmentScheduler, start, end)); + AtomicLong haveOffloadEntryNumber = new AtomicLong(0); + long needToOffloadFirstEntryNumber = 0; + CountDownLatch countDownLatch; + //avoid prefetch too much data into memory + Semaphore semaphore = new Semaphore(managedLedgerOffloadPrefetchRounds); + do { + long end = Math.min(needToOffloadFirstEntryNumber + ENTRIES_PER_READ - 1, + readHandle.getLastAddConfirmed()); + log.debug("read ledger entries. start: {}, end: {}", needToOffloadFirstEntryNumber, end); + long startReadTime = System.nanoTime(); + LedgerEntries ledgerEntriesOnce = readHandle.readAsync(needToOffloadFirstEntryNumber, end).get(); + long cost = System.nanoTime() - startReadTime; + this.offloaderStats.recordReadLedgerLatency(topicName, cost, TimeUnit.NANOSECONDS); + semaphore.acquire(); + countDownLatch = new CountDownLatch(1); + assignmentScheduler.chooseThread(ledgerId) + .execute(FileSystemWriter.create(ledgerEntriesOnce, + dataWriter, semaphore, countDownLatch, haveOffloadEntryNumber, this)); + needToOffloadFirstEntryNumber = end + 1; + } while (needToOffloadFirstEntryNumber - 1 != readHandle.getLastAddConfirmed() + && fileSystemWriteException == null); + countDownLatch.await(); + if (fileSystemWriteException != null) { + throw fileSystemWriteException; + } + IOUtils.closeStream(dataWriter); + promise.complete(null); } catch (Exception e) { log.error("Exception when get CompletableFuture : ManagerLedgerName: {}, " + "LedgerId: {}, UUID: {} ", topicName, ledgerId, uuid, e); + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } this.offloaderStats.recordOffloadError(topicName); promise.completeExceptionally(e); } } } - private static final class LedgerPartitionedReader implements Runnable { - - private final ReadHandle readHandle; - private final String topicName; - private final LedgerOffloaderStats stats; - private final MapFile.Writer dataWriter; - private final OrderedScheduler scheduler; - private final LedgerReader ledgerReader; - private final CompletableFuture promise; - - private final long startEntry; - private final long endEntry; - - LedgerPartitionedReader(LedgerReader ledgerReader, MapFile.Writer dataWriter, - OrderedScheduler scheduler, long startEntry, long endEntry) { - this.ledgerReader = ledgerReader; - this.readHandle = ledgerReader.readHandle; - this.topicName = ledgerReader.extraMetadata.get(MANAGED_LEDGER_NAME); - this.stats = ledgerReader.offloaderStats; - this.dataWriter = dataWriter; - this.scheduler = scheduler; - this.promise = ledgerReader.promise; - - this.startEntry = startEntry; - this.endEntry = endEntry; - } - - LedgerPartitionedReader(LedgerPartitionedReader reader, long startEntry, long endEntry) { - this.readHandle = reader.readHandle; - this.topicName = reader.topicName; - this.stats = reader.stats; - this.dataWriter = reader.dataWriter; - this.scheduler = reader.scheduler; - this.ledgerReader = reader.ledgerReader; - this.promise = reader.promise; - this.startEntry = startEntry; - this.endEntry = endEntry; - } - - @Override - public void run() { - log.debug("read ledger entries. start: {}, end: {}", startEntry, endEntry); - long startReadTime = System.nanoTime(); - - readHandle.readAsync(startEntry, endEntry) - .thenAcceptAsync(entries -> { - long cost = System.nanoTime() - startReadTime; - this.stats.recordReadLedgerLatency(topicName, cost, TimeUnit.NANOSECONDS); - // Execute the FileWrite task on the current thread. - FileSystemWriter.create(entries, dataWriter, ledgerReader).run(); - // Do the post process. - this.processAfterTaskFinished(); - }, this.scheduler.chooseThread(readHandle.getId())) - .exceptionally(e -> { - log.error("Read ledger failed. Ledger name {}, ledgerId {}, startEntry:{}, endEntry:{}.", - topicName, readHandle.getId(), startEntry, endEntry, e); - promise.completeExceptionally(e); - return null; - }); - } - - private void processAfterTaskFinished() { - if (ledgerReader.fileSystemWriteException != null) { - promise.completeExceptionally(ledgerReader.fileSystemWriteException); - return; - } - - // If no more entries to offload, close the file and complete the promise. - if (this.endEntry == readHandle.getLastAddConfirmed()) { - IOUtils.closeStream(this.dataWriter); - promise.complete(null); - } else { - // Submit the next task. - long startEntry = endEntry + 1; - long endEntry = Math.min(startEntry + ENTRIES_PER_READ - 1, readHandle.getLastAddConfirmed()); - this.scheduler.chooseThread(readHandle.getId()) - .execute(new LedgerPartitionedReader(this, startEntry, endEntry)); - } - } - } - private static class FileSystemWriter implements Runnable { private LedgerEntries ledgerEntriesOnce; @@ -308,8 +259,11 @@ private static class FileSystemWriter implements Runnable { private final BytesWritable value = new BytesWritable(); private MapFile.Writer dataWriter; + private CountDownLatch countDownLatch; + private AtomicLong haveOffloadEntryNumber; private LedgerReader ledgerReader; - private final Recycler.Handle recyclerHandle; + private Semaphore semaphore; + private Recycler.Handle recyclerHandle; private FileSystemWriter(Recycler.Handle recyclerHandle) { this.recyclerHandle = recyclerHandle; @@ -324,19 +278,28 @@ protected FileSystemWriter newObject(Recycler.Handle handle) { private void recycle() { this.dataWriter = null; + this.countDownLatch = null; + this.haveOffloadEntryNumber = null; this.ledgerReader = null; this.ledgerEntriesOnce = null; + this.semaphore = null; recyclerHandle.recycle(this); } public static FileSystemWriter create(LedgerEntries ledgerEntriesOnce, MapFile.Writer dataWriter, + Semaphore semaphore, + CountDownLatch countDownLatch, + AtomicLong haveOffloadEntryNumber, LedgerReader ledgerReader) { FileSystemWriter writer = RECYCLER.get(); writer.ledgerReader = ledgerReader; writer.dataWriter = dataWriter; + writer.countDownLatch = countDownLatch; + writer.haveOffloadEntryNumber = haveOffloadEntryNumber; writer.ledgerEntriesOnce = ledgerEntriesOnce; + writer.semaphore = semaphore; return writer; } @@ -344,7 +307,9 @@ public static FileSystemWriter create(LedgerEntries ledgerEntriesOnce, public void run() { String managedLedgerName = ledgerReader.extraMetadata.get(MANAGED_LEDGER_NAME); if (ledgerReader.fileSystemWriteException == null) { - for (LedgerEntry entry : ledgerEntriesOnce) { + Iterator iterator = ledgerEntriesOnce.iterator(); + while (iterator.hasNext()) { + LedgerEntry entry = iterator.next(); long entryId = entry.getEntryId(); key.set(entryId); try { @@ -355,10 +320,13 @@ public void run() { ledgerReader.offloaderStats.recordWriteToStorageError(managedLedgerName); break; } + haveOffloadEntryNumber.incrementAndGet(); ledgerReader.offloaderStats.recordOffloadBytes(managedLedgerName, entry.getLength()); } } + countDownLatch.countDown(); ledgerEntriesOnce.close(); + semaphore.release(); this.recycle(); } } @@ -426,4 +394,4 @@ public void close() { } } } -} +} \ No newline at end of file diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java index e3346a1002f2f..ce75335f5804c 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java @@ -173,72 +173,112 @@ public Map getOffloadDriverMetadata() { * Creating indexBlocks for each corresponding DataBlock that is uploaded. */ @Override - @SuppressWarnings("UnstableApiUsage") public CompletableFuture offload(ReadHandle readHandle, UUID uuid, Map extraMetadata) { - final CompletableFuture promise = new CompletableFuture<>(); - if (null == extraMetadata) { - promise.completeExceptionally(new IllegalArgumentException("Argument [extraMetadata] can't be null.")); - return promise; - } - - final long ledgerId = readHandle.getId(); - final String bucket = config.getBucket(); final String topicName = extraMetadata.get(MANAGED_LEDGER_NAME); final BlobStore writeBlobStore = blobStores.get(config.getBlobStoreLocation()); - log.info("offload {} uuid {} extraMetadata {} to {} {}", ledgerId, uuid, extraMetadata, + log.info("offload {} uuid {} extraMetadata {} to {} {}", readHandle.getId(), uuid, extraMetadata, config.getBlobStoreLocation(), writeBlobStore); + CompletableFuture promise = new CompletableFuture<>(); + scheduler.chooseThread(readHandle.getId()).execute(() -> { + if (readHandle.getLength() == 0 || !readHandle.isClosed() || readHandle.getLastAddConfirmed() < 0) { + promise.completeExceptionally( + new IllegalArgumentException("An empty or open ledger should never be offloaded")); + return; + } + OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create() + .withLedgerMetadata(readHandle.getLedgerMetadata()) + .withDataBlockHeaderLength(BlockAwareSegmentInputStreamImpl.getHeaderSize()); + String dataBlockKey = DataBlockUtils.dataBlockOffloadKey(readHandle.getId(), uuid); + String indexBlockKey = DataBlockUtils.indexBlockOffloadKey(readHandle.getId(), uuid); + log.info("ledger {} dataBlockKey {} indexBlockKey {}", readHandle.getId(), dataBlockKey, indexBlockKey); - if (readHandle.getLength() == 0 || !readHandle.isClosed() || readHandle.getLastAddConfirmed() < 0) { - promise.completeExceptionally( - new IllegalArgumentException("An empty or open ledger should never be offloaded")); - return promise; - } - OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create() - .withLedgerMetadata(readHandle.getLedgerMetadata()) - .withDataBlockHeaderLength(BlockAwareSegmentInputStreamImpl.getHeaderSize()); - String dataBlockKey = DataBlockUtils.dataBlockOffloadKey(ledgerId, uuid); - String indexBlockKey = DataBlockUtils.indexBlockOffloadKey(ledgerId, uuid); - log.info("ledger {} dataBlockKey {} indexBlockKey {}", ledgerId, dataBlockKey, indexBlockKey); + MultipartUpload mpu = null; + List parts = Lists.newArrayList(); - MultipartUpload mpu; - List parts = Lists.newArrayList(); + // init multi part upload for data block. + try { + BlobBuilder blobBuilder = writeBlobStore.blobBuilder(dataBlockKey); + Map objectMetadata = new HashMap<>(userMetadata); + objectMetadata.put("role", "data"); + if (extraMetadata != null) { + objectMetadata.putAll(extraMetadata); + } + DataBlockUtils.addVersionInfo(blobBuilder, objectMetadata); + Blob blob = blobBuilder.build(); + log.info("initiateMultipartUpload bucket {}, metadata {} ", config.getBucket(), blob.getMetadata()); + mpu = writeBlobStore.initiateMultipartUpload(config.getBucket(), blob.getMetadata(), new PutOptions()); + } catch (Throwable t) { + promise.completeExceptionally(t); + return; + } - // init multi part upload for data block. - try { - BlobBuilder blobBuilder = writeBlobStore.blobBuilder(dataBlockKey); - Map objectMetadata = new HashMap<>(userMetadata); - objectMetadata.put("role", "data"); - objectMetadata.putAll(extraMetadata); - - DataBlockUtils.addVersionInfo(blobBuilder, objectMetadata); - Blob blob = blobBuilder.build(); - log.info("initiateMultipartUpload bucket {}, metadata {} ", bucket, blob.getMetadata()); - mpu = writeBlobStore.initiateMultipartUpload(bucket, blob.getMetadata(), new PutOptions()); - } catch (Throwable t) { - promise.completeExceptionally(t); - return promise; - } + long dataObjectLength = 0; + // start multi part upload for data block. + try { + long startEntry = 0; + int partId = 1; + long start = System.nanoTime(); + long entryBytesWritten = 0; + while (startEntry <= readHandle.getLastAddConfirmed()) { + int blockSize = BlockAwareSegmentInputStreamImpl + .calculateBlockSize(config.getMaxBlockSizeInBytes(), readHandle, startEntry, entryBytesWritten); + + try (BlockAwareSegmentInputStream blockStream = new BlockAwareSegmentInputStreamImpl( + readHandle, startEntry, blockSize, this.offloaderStats, topicName)) { + + Payload partPayload = Payloads.newInputStreamPayload(blockStream); + partPayload.getContentMetadata().setContentLength((long) blockSize); + partPayload.getContentMetadata().setContentType("application/octet-stream"); + parts.add(writeBlobStore.uploadMultipartPart(mpu, partId, partPayload)); + log.debug("UploadMultipartPart. container: {}, blobName: {}, partId: {}, mpu: {}", + config.getBucket(), dataBlockKey, partId, mpu.id()); + + indexBuilder.addBlock(startEntry, partId, blockSize); + + if (blockStream.getEndEntryId() != -1) { + startEntry = blockStream.getEndEntryId() + 1; + } else { + // could not read entry from ledger. + break; + } + entryBytesWritten += blockStream.getBlockEntryBytesCount(); + partId++; + this.offloaderStats.recordOffloadBytes(topicName, blockStream.getBlockEntryBytesCount()); + } - final AtomicLong dataLen = new AtomicLong(0); - final CompletableFuture f = new CompletableFuture<>(); - // Execute the first block task. - final int maxBlockSize = config.getMaxBlockSizeInBytes(); - Runnable firstTask = new BlobStorePartitionedReader(maxBlockSize, readHandle, writeBlobStore, mpu, - parts, indexBuilder, f, offloaderStats, topicName, bucket, dataBlockKey, dataLen, scheduler); - scheduler.chooseThread(ledgerId).execute(firstTask); + dataObjectLength += blockSize; + } + + String etag = writeBlobStore.completeMultipartUpload(mpu, parts); + log.info("Ledger {}, upload finished, etag {}", readHandle.getId(), etag); + mpu = null; + } catch (Throwable t) { + try { + if (mpu != null) { + writeBlobStore.abortMultipartUpload(mpu); + } + } catch (Throwable throwable) { + log.error("Failed abortMultipartUpload in bucket - {} with key - {}, uploadId - {}.", + config.getBucket(), dataBlockKey, mpu.id(), throwable); + } + this.offloaderStats.recordWriteToStorageError(topicName); + this.offloaderStats.recordOffloadError(topicName); + promise.completeExceptionally(t); + return; + } - f.thenAccept(__ -> { // upload index block - try (OffloadIndexBlock index = indexBuilder.withDataObjectLength(dataLen.get()).build(); + try (OffloadIndexBlock index = indexBuilder.withDataObjectLength(dataObjectLength).build(); IndexInputStream indexStream = index.toStream()) { // write the index block BlobBuilder blobBuilder = writeBlobStore.blobBuilder(indexBlockKey); Map objectMetadata = new HashMap<>(userMetadata); objectMetadata.put("role", "index"); - objectMetadata.putAll(extraMetadata); - + if (extraMetadata != null) { + objectMetadata.putAll(extraMetadata); + } DataBlockUtils.addVersionInfo(blobBuilder, objectMetadata); Payload indexPayload = Payloads.newInputStreamPayload(indexStream); indexPayload.getContentMetadata().setContentLength((long) indexStream.getStreamSize()); @@ -246,143 +286,27 @@ public CompletableFuture offload(ReadHandle readHandle, Blob blob = blobBuilder .payload(indexPayload) - .contentLength(indexStream.getStreamSize()) + .contentLength((long) indexStream.getStreamSize()) .build(); - writeBlobStore.putBlob(bucket, blob); + writeBlobStore.putBlob(config.getBucket(), blob); promise.complete(null); } catch (Throwable t) { try { - writeBlobStore.removeBlob(bucket, dataBlockKey); + writeBlobStore.removeBlob(config.getBucket(), dataBlockKey); } catch (Throwable throwable) { - log.error("Failed deleteObject in bucket - {} with key - {}.", bucket, dataBlockKey, throwable); + log.error("Failed deleteObject in bucket - {} with key - {}.", + config.getBucket(), dataBlockKey, throwable); } this.offloaderStats.recordWriteToStorageError(topicName); this.offloaderStats.recordOffloadError(topicName); promise.completeExceptionally(t); + return; } - }).exceptionally(e -> { - try { - blobStore.abortMultipartUpload(mpu); - } catch (Throwable throwable) { - log.error("Failed abortMultipartUpload in bucket - {} with key - {}, uploadId - {}.", - bucket, dataBlockKey, mpu.id(), throwable); - } - this.offloaderStats.recordWriteToStorageError(topicName); - this.offloaderStats.recordOffloadError(topicName); - promise.completeExceptionally(e); - return null; }); - return promise; } - - @SuppressWarnings("UnstableApiUsage") - private static final class BlobStorePartitionedReader implements Runnable { - - private long written; - private long startEntry; - private final int maxBlockSize; - private final ReadHandle handle; - private final BlobStore blobStore; - private final MultipartUpload mp; - private final List parts; - private final OffloadIndexBlockBuilder indexBuilder; - private final CompletableFuture f; - private final LedgerOffloaderStats stats; - private final String topicName; - private final String bucket; - private final String dataBlockKey; - private final AtomicLong dataLen; - private final OrderedScheduler scheduler; - - BlobStorePartitionedReader(int maxBlockSize, ReadHandle handle, BlobStore blobStore, MultipartUpload mp, - List parts, OffloadIndexBlockBuilder indexBuilder, - CompletableFuture f, LedgerOffloaderStats stats, String topicName, - String bucket, String dataBlockKey, AtomicLong dataLen, - OrderedScheduler scheduler) { - this.startEntry = 0; - this.written = 0; - this.maxBlockSize = maxBlockSize; - this.handle = handle; - this.blobStore = blobStore; - this.mp = mp; - this.parts = parts; - this.indexBuilder = indexBuilder; - this.f = f; - this.stats = stats; - this.topicName = topicName; - this.bucket = bucket; - this.dataBlockKey = dataBlockKey; - this.dataLen = dataLen; - this.scheduler = scheduler; - } - - BlobStorePartitionedReader(BlobStorePartitionedReader r) { - this.startEntry = r.startEntry; - this.written = r.written; - this.maxBlockSize = r.maxBlockSize; - this.handle = r.handle; - this.blobStore = r.blobStore; - this.mp = r.mp; - this.parts = r.parts; - this.indexBuilder = r.indexBuilder; - this.f = r.f; - this.stats = r.stats; - this.topicName = r.topicName; - this.bucket = r.bucket; - this.dataBlockKey = r.dataBlockKey; - this.dataLen = r.dataLen; - this.scheduler = r.scheduler; - } - - @Override - public void run() { - // start multi part upload for data block. - int partId = this.parts.size() + 1; - try { - int blockSize = BlockAwareSegmentInputStreamImpl - .calculateBlockSize(maxBlockSize, handle, startEntry, written); - - try (BlockAwareSegmentInputStream blockStream = new BlockAwareSegmentInputStreamImpl( - handle, startEntry, blockSize, this.stats, topicName)) { - - Payload partPayload = Payloads.newInputStreamPayload(blockStream); - partPayload.getContentMetadata().setContentLength((long) blockSize); - partPayload.getContentMetadata().setContentType("application/octet-stream"); - parts.add(blobStore.uploadMultipartPart(mp, partId, partPayload)); - log.debug("UploadMultipartPart. container: {}, blobName: {}, partId: {}, mpu: {}", - bucket, dataBlockKey, partId, mp.id()); - - indexBuilder.addBlock(startEntry, partId, blockSize); - - if (blockStream.getEndEntryId() != -1) { - startEntry = blockStream.getEndEntryId() + 1; - written += blockStream.getBlockEntryBytesCount(); - this.stats.recordOffloadBytes(topicName, blockStream.getBlockEntryBytesCount()); - this.dataLen.addAndGet(blockSize); - } - } - this.processAfterTaskFinished(); - } catch (Throwable t) { - log.error("Blob store offload failed. LedgerId {}, LedgerName {}, PartId {}, StartEntry {}.", - handle.getId(), topicName, partId, startEntry, t); - f.completeExceptionally(t); - } - } - - private void processAfterTaskFinished() { - if (!(startEntry <= handle.getLastAddConfirmed())) { - String etag = blobStore.completeMultipartUpload(mp, parts); - log.info("Ledger {}, upload finished, etag {}", handle.getId(), etag); - f.complete(null); - } else { - scheduler.chooseThread(handle.getId()).execute(new BlobStorePartitionedReader(this)); - } - } - } - BlobStore blobStore; String streamingDataBlockKey; String streamingDataIndexKey; @@ -601,7 +525,7 @@ private PositionImpl lastOffered() { */ private BlobStoreLocation getBlobStoreLocation(Map offloadDriverMetadata) { return (!offloadDriverMetadata.isEmpty()) ? new BlobStoreLocation(offloadDriverMetadata) : - new BlobStoreLocation(getOffloadDriverMetadata()); + new BlobStoreLocation(getOffloadDriverMetadata()); } @Override @@ -676,8 +600,8 @@ public CompletableFuture deleteOffloaded(long ledgerId, UUID uid, scheduler.chooseThread(ledgerId).execute(() -> { try { readBlobstore.removeBlobs(readBucket, - ImmutableList.of(DataBlockUtils.dataBlockOffloadKey(ledgerId, uid), - DataBlockUtils.indexBlockOffloadKey(ledgerId, uid))); + ImmutableList.of(DataBlockUtils.dataBlockOffloadKey(ledgerId, uid), + DataBlockUtils.indexBlockOffloadKey(ledgerId, uid))); promise.complete(null); } catch (Throwable t) { log.error("Failed delete Blob", t); @@ -802,4 +726,4 @@ private String scanContainer(OffloadedLedgerMetadataConsumer consumer, BlobStore return pages.getNextMarker(); } -} +} \ No newline at end of file From 12c303b39ee34a785e5aae3486737f10c9a326c1 Mon Sep 17 00:00:00 2001 From: daojun Date: Mon, 28 Nov 2022 18:05:46 +0800 Subject: [PATCH 12/15] fix checkstyle --- .../offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java index ce75335f5804c..a01c8ab64adbc 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java @@ -219,11 +219,10 @@ public CompletableFuture offload(ReadHandle readHandle, try { long startEntry = 0; int partId = 1; - long start = System.nanoTime(); long entryBytesWritten = 0; while (startEntry <= readHandle.getLastAddConfirmed()) { - int blockSize = BlockAwareSegmentInputStreamImpl - .calculateBlockSize(config.getMaxBlockSizeInBytes(), readHandle, startEntry, entryBytesWritten); + int blockSize = BlockAwareSegmentInputStreamImpl.calculateBlockSize( + config.getMaxBlockSizeInBytes(), readHandle, startEntry, entryBytesWritten); try (BlockAwareSegmentInputStream blockStream = new BlockAwareSegmentInputStreamImpl( readHandle, startEntry, blockSize, this.offloaderStats, topicName)) { From e71364a72c900f8ce8a0531183f1220e9b947cdc Mon Sep 17 00:00:00 2001 From: daojun Date: Tue, 6 Dec 2022 16:18:43 +0800 Subject: [PATCH 13/15] fix test npe --- .../java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java | 2 +- .../java/org/apache/bookkeeper/mledger/OffloadReadHandle.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index d9130b08e211b..9e99f67bb89b8 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -85,7 +85,7 @@ public class ManagedLedgerConfig { private int minimumBacklogCursorsForCaching = 0; private int minimumBacklogEntriesForCaching = 1000; private int maxBacklogBetweenCursorsForCaching = 1000; - private Long managedLedgerOffloadFlowPermitsPerSecond; + private long managedLedgerOffloadFlowPermitsPerSecond = -1; public boolean isCreateIfMissing() { return createIfMissing; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java index 5b57cc5701e61..cd99de90b9ee9 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java @@ -33,7 +33,7 @@ public class OffloadReadHandle implements ReadHandle { private static final AtomicBoolean INITIALIZED = new AtomicBoolean(false); - private static volatile Long flowPermits; + private static volatile long flowPermits = -1L; private static volatile TimeWindow window; private final ReadHandle delegator; From 74971ba577f49c0625b2964a3ae0f7254eb8951e Mon Sep 17 00:00:00 2001 From: daojun Date: Tue, 6 Dec 2022 18:02:02 +0800 Subject: [PATCH 14/15] fix OffloadReadHandleTest --- .../bookkeeper/mledger/OffloadReadHandle.java | 9 +++ .../mledger/OffloadReadHandleTest.java | 75 ++++++++++++------- 2 files changed, 59 insertions(+), 25 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java index cd99de90b9ee9..d14256f7fd75f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java @@ -23,6 +23,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.annotations.VisibleForTesting; import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerMetadata; @@ -202,4 +204,11 @@ public void run() { }); } } + + @VisibleForTesting + public void reset() { + INITIALIZED.set(false); + flowPermits = -1L; + window = null; + } } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/OffloadReadHandleTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/OffloadReadHandleTest.java index 933bf1417b1f7..3a0a6787e16be 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/OffloadReadHandleTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/OffloadReadHandleTest.java @@ -18,9 +18,10 @@ */ package org.apache.bookkeeper.mledger; +import com.google.common.collect.Lists; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; -import java.util.Collections; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.client.api.LedgerEntries; @@ -31,41 +32,65 @@ import org.apache.bookkeeper.common.util.OrderedScheduler; import org.mockito.Mockito; import org.testng.Assert; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; public class OffloadReadHandleTest { - @Test - public void test() throws Exception { + @DataProvider(name = "flowPermits") + public Object[][] permits() { + return new Object[][]{ + {-1L}, + {0L}, + {100L}, + {10000L} + }; + } + + @Test(dataProvider = "flowPermits") + public void testFlowPermits(long flowPermits) throws Exception { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(100); for (int a = 0; a < 100; a++) { buf.writeByte(0); } LedgerEntry entry = LedgerEntryImpl.create(1, 1, buf.readableBytes(), buf); - LedgerEntries entries = LedgerEntriesImpl.create(Collections.singletonList(entry)); - - ReadHandle handle = Mockito.mock(ReadHandle.class); - Mockito.doAnswer(inv -> CompletableFuture.completedFuture(entries)).when(handle).readAsync(1, 1); + List entryList = Lists.newArrayList(entry); + OffloadReadHandle h = null; + try (LedgerEntries entries = LedgerEntriesImpl.create(entryList)) { + ReadHandle handle = Mockito.mock(ReadHandle.class); + Mockito.doAnswer(inv -> CompletableFuture.completedFuture(entries)).when(handle).readAsync(1, 1); - long start = System.currentTimeMillis(); - ManagedLedgerConfig config = new ManagedLedgerConfig(); - config.setManagedLedgerOffloadFlowPermitsPerSecond(100L); + long start = System.currentTimeMillis(); + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setManagedLedgerOffloadFlowPermitsPerSecond(flowPermits); - CompletableFuture future = OffloadReadHandle.create(handle, config, - OrderedScheduler.newSchedulerBuilder().numThreads(2).build()); - ReadHandle h = future.get(); - h.readAsync(1, 1); - h.readAsync(1, 1); - h.readAsync(1, 1); - h.readAsync(1, 1); - h.readAsync(1, 1); - h.readAsync(1, 1); - h.readAsync(1, 1); - h.readAsync(1, 1); - h.readAsync(1, 1); - h.readAsync(1, 1); + CompletableFuture future = OffloadReadHandle.create(handle, config, + OrderedScheduler.newSchedulerBuilder().numThreads(2).build()); + h = (OffloadReadHandle) future.get(); + h.read(1, 1); + h.read(1, 1); + h.read(1, 1); + h.read(1, 1); + h.read(1, 1); + h.read(1, 1); + h.read(1, 1); + h.read(1, 1); + h.read(1, 1); + h.read(1, 1); - Assert.assertTrue(System.currentTimeMillis() - start > TimeUnit.SECONDS.toMillis(8)); + long actualDuration = System.currentTimeMillis() - start; + if (flowPermits <= 0L) { + Assert.assertEquals(actualDuration, 1000D, 1000D); + } else if (flowPermits == 100L) { + long expectDuration = TimeUnit.SECONDS.toMillis(8); + Assert.assertEquals(actualDuration, expectDuration, expectDuration * 0.2D); + } else if (flowPermits == 10000L) { + Assert.assertEquals(actualDuration, 1000D, 1000D); + } + } finally { + if (null != h) { + h.reset(); + } + } } - } From d648ed03cc166fbe0820613610a934f8a0dc5802 Mon Sep 17 00:00:00 2001 From: daojun Date: Tue, 6 Dec 2022 20:04:07 +0800 Subject: [PATCH 15/15] fix import order --- .../java/org/apache/bookkeeper/mledger/OffloadReadHandle.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java index d14256f7fd75f..253d82a3f6572 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java @@ -18,13 +18,12 @@ */ package org.apache.bookkeeper.mledger; +import com.google.common.annotations.VisibleForTesting; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; - -import com.google.common.annotations.VisibleForTesting; import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerMetadata;