diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index c02a781fda354..9e99f67bb89b8 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -85,6 +85,7 @@ public class ManagedLedgerConfig { private int minimumBacklogCursorsForCaching = 0; private int minimumBacklogEntriesForCaching = 1000; private int maxBacklogBetweenCursorsForCaching = 1000; + private long managedLedgerOffloadFlowPermitsPerSecond = -1; public boolean isCreateIfMissing() { return createIfMissing; @@ -749,4 +750,23 @@ public String getShadowSource() { } public static final String PROPERTY_SOURCE_TOPIC_KEY = "PULSAR.SHADOW_SOURCE"; + + /** + * Set permitted size to offload on the broker. + * + * @param managedLedgerOffloadBrokerFlowPermit + */ + public void setManagedLedgerOffloadFlowPermitsPerSecond(long managedLedgerOffloadBrokerFlowPermit) { + this.managedLedgerOffloadFlowPermitsPerSecond = managedLedgerOffloadBrokerFlowPermit; + } + + /** + * Get permitted size to offload on the broker. + * + * @return + */ + public long getManagedLedgerOffloadFlowPermitsPerSecond() { + return managedLedgerOffloadFlowPermitsPerSecond; + } + } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java new file mode 100644 index 0000000000000..253d82a3f6572 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadReadHandle.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger; + +import com.google.common.annotations.VisibleForTesting; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerMetadata; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.bookkeeper.mledger.util.TimeWindow; +import org.apache.bookkeeper.mledger.util.WindowWrap; + +public class OffloadReadHandle implements ReadHandle { + private static final AtomicBoolean INITIALIZED = new AtomicBoolean(false); + private static volatile long flowPermits = -1L; + private static volatile TimeWindow window; + + private final ReadHandle delegator; + private final OrderedScheduler scheduler; + + private OffloadReadHandle(ReadHandle handle, ManagedLedgerConfig config, + OrderedScheduler scheduler) { + initialize(config); + this.delegator = handle; + this.scheduler = Objects.requireNonNull(scheduler); + } + + private static void initialize(ManagedLedgerConfig config) { + if (INITIALIZED.compareAndSet(false, true)) { + flowPermits = config.getManagedLedgerOffloadFlowPermitsPerSecond(); + window = new TimeWindow<>(2, 1000); + } + } + + public static CompletableFuture create(ReadHandle handle, ManagedLedgerConfig config, + OrderedScheduler scheduler) { + return CompletableFuture.completedFuture(new OffloadReadHandle(handle, config, scheduler)); + } + + @Override + public CompletableFuture readAsync(long firstEntry, long lastEntry) { + final long delayMills = calculateDelayMillis(); + if (delayMills > 0) { + CompletableFuture f = new CompletableFuture<>(); + Runnable cmd = new ReadAsyncCommand(firstEntry, lastEntry, f); + scheduler.schedule(cmd, delayMills, TimeUnit.MILLISECONDS); + return f; + } + + return this.delegator + .readAsync(firstEntry, lastEntry) + .whenComplete((v, t) -> { + if (t == null) { + recordReadBytes(v); + } + }); + } + + @Override + public CompletableFuture readUnconfirmedAsync(long firstEntry, long lastEntry) { + return this.delegator.readUnconfirmedAsync(firstEntry, lastEntry); + } + + @Override + public CompletableFuture readLastAddConfirmedAsync() { + return this.delegator.readLastAddConfirmedAsync(); + } + + @Override + public CompletableFuture tryReadLastAddConfirmedAsync() { + return this.delegator.tryReadLastAddConfirmedAsync(); + } + + @Override + public long getLastAddConfirmed() { + return this.delegator.getLastAddConfirmed(); + } + + @Override + public long getLength() { + return this.delegator.getLength(); + } + + @Override + public boolean isClosed() { + return this.delegator.isClosed(); + } + + @Override + public CompletableFuture readLastAddConfirmedAndEntryAsync( + long entryId, long timeOutInMillis, boolean parallel) { + return this.delegator.readLastAddConfirmedAndEntryAsync(entryId, timeOutInMillis, parallel); + } + + @Override + public long getId() { + return this.delegator.getId(); + } + + @Override + public CompletableFuture closeAsync() { + return this.delegator.closeAsync(); + } + + @Override + public LedgerMetadata getLedgerMetadata() { + return this.delegator.getLedgerMetadata(); + } + + + private long calculateDelayMillis() { + if (flowPermits <= 0) { + return 0; + } + + WindowWrap wrap = window.current(__ -> new AtomicLong(0)); + if (wrap == null) { + // it should never goes here + return 0; + } + + if (wrap.value().get() >= flowPermits) { + // park until next window start + long end = wrap.start() + wrap.interval(); + return end - System.currentTimeMillis(); + } + + return 0; + } + + private void recordReadBytes(LedgerEntries entries) { + if (flowPermits <= 0) { + return; + } + + if (entries == null) { + return; + } + + AtomicLong num = new AtomicLong(0); + entries.forEach(en -> num.addAndGet(en.getLength())); + + WindowWrap wrap = window.current(__ -> new AtomicLong(0)); + if (wrap == null) { + // it should never goes here + return; + } + + wrap.value().addAndGet(num.get()); + } + + + private final class ReadAsyncCommand implements Runnable { + + private final long firstEntry; + private final long lastEntry; + private final CompletableFuture f; + + ReadAsyncCommand(long firstEntry, long lastEntry, CompletableFuture f) { + this.firstEntry = firstEntry; + this.lastEntry = lastEntry; + this.f = f; + } + + @Override + public void run() { + long delayMillis = calculateDelayMillis(); + if (delayMillis > 0) { + scheduler.schedule(this, delayMillis, TimeUnit.MILLISECONDS); + return; + } + + delegator.readAsync(firstEntry, lastEntry) + .whenComplete((entries, e) -> { + if (e != null) { + f.completeExceptionally(e); + } else { + f.complete(entries); + recordReadBytes(entries); + } + }); + } + } + + @VisibleForTesting + public void reset() { + INITIALIZED.set(false); + flowPermits = -1L; + window = null; + } +} diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 46f79a7146279..7926f339b6832 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -112,6 +112,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException; import org.apache.bookkeeper.mledger.ManagedLedgerMXBean; +import org.apache.bookkeeper.mledger.OffloadReadHandle; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.WaitingEntryCallBack; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback; @@ -3126,15 +3127,16 @@ private void offloadLoop(CompletableFuture promise, Queue driverMetadata = config.getLedgerOffloader().getOffloadDriverMetadata(); prepareLedgerInfoForOffloaded(ledgerId, uuid, driverName, driverMetadata) - .thenCompose((ignore) -> getLedgerHandle(ledgerId)) - .thenCompose(readHandle -> config.getLedgerOffloader().offload(readHandle, uuid, extraMetadata)) - .thenCompose((ignore) -> { + .thenCompose((ignore) -> getLedgerHandle(ledgerId)) + .thenCompose(handle -> OffloadReadHandle.create(handle, config, scheduledExecutor)) + .thenCompose(readHandle -> config.getLedgerOffloader().offload(readHandle, uuid, extraMetadata)) + .thenCompose((ignore) -> { return Retries.run(Backoff.exponentialJittered(TimeUnit.SECONDS.toMillis(1), - TimeUnit.SECONDS.toHours(1)).limit(10), - FAIL_ON_CONFLICT, - () -> completeLedgerInfoForOffloaded(ledgerId, uuid), - scheduledExecutor, name) - .whenComplete((ignore2, exception) -> { + TimeUnit.SECONDS.toHours(1)).limit(10), + FAIL_ON_CONFLICT, + () -> completeLedgerInfoForOffloaded(ledgerId, uuid), + scheduledExecutor, name) + .whenComplete((ignore2, exception) -> { if (exception != null) { Throwable e = FutureUtil.unwrapCompletionException(exception); if (e instanceof MetaStoreException) { @@ -3157,7 +3159,7 @@ private void offloadLoop(CompletableFuture promise, Queue { + .whenComplete((ignore, exception) -> { if (exception != null) { lastOffloadFailureTimestamp = System.currentTimeMillis(); log.warn("[{}] Exception occurred for ledgerId {} timestamp {} during offload", name, @@ -3174,9 +3176,7 @@ private void offloadLoop(CompletableFuture promise, Queue { @@ -26,51 +28,91 @@ public final class TimeWindow { private final int sampleCount; private final AtomicReferenceArray> array; + private final Lock updateLock = new ReentrantLock(); + public TimeWindow(int sampleCount, int interval) { this.sampleCount = sampleCount; this.interval = interval; this.array = new AtomicReferenceArray<>(sampleCount); } + + public WindowWrap current(Function function) { + return current(function, System.currentTimeMillis()); + } + + /** * return current time window data. * * @param function generate data. * @return */ - public synchronized WindowWrap current(Function function) { - long millis = System.currentTimeMillis(); - - if (millis < 0) { + public WindowWrap current(Function function, long timeMillis) { + if (timeMillis < 0) { return null; } - int idx = calculateTimeIdx(millis); - long windowStart = calculateWindowStart(millis); + + int idx = calculateTimeIdx(timeMillis); + // Calculate current bucket start time. + long windowStart = calculateWindowStart(timeMillis); while (true) { WindowWrap old = array.get(idx); if (old == null) { - WindowWrap window = new WindowWrap<>(interval, windowStart, null); + WindowWrap window = new WindowWrap<>(interval, windowStart, function.apply(null)); if (array.compareAndSet(idx, null, window)) { - T value = null == function ? null : function.apply(null); - window.value(value); return window; } else { + // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } } else if (windowStart == old.start()) { return old; } else if (windowStart > old.start()) { - T value = null == function ? null : function.apply(old.value()); - old.value(value); - old.resetWindowStart(windowStart); - return old; + if (updateLock.tryLock()) { + try { + // Successfully get the update lock, now we reset the bucket. + T value = null == function ? null : function.apply(old.value()); + old.value(value); + old.resetWindowStart(windowStart); + return old; + } finally { + updateLock.unlock(); + } + } else { + // Contention failed, the thread will yield its time slice to wait for bucket available. + Thread.yield(); + } } else { - //it should never goes here + //when windowStart < old.value() + // Should not go through here, as the provided time is already behind. throw new IllegalStateException(); } } } + /** + * return next time window data. + * + * @param function generate data. + */ + public WindowWrap next(Function function, long timeMillis) { + if (this.sampleCount <= 1) { + throw new IllegalStateException("Argument sampleCount cannot less than 2"); + } + return this.current(function, timeMillis + interval); + } + + + /** + * return next time window data. + * + * @param function generate data. + */ + public WindowWrap next(Function function) { + return this.next(function, System.currentTimeMillis()); + } + private int calculateTimeIdx(long timeMillis) { long timeId = timeMillis / this.interval; return (int) (timeId % sampleCount); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/WindowWrap.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/WindowWrap.java similarity index 97% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/WindowWrap.java rename to managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/WindowWrap.java index 12869b82921e5..b215e9dba9a6a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/WindowWrap.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/WindowWrap.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.stats; +package org.apache.bookkeeper.mledger.util; public final class WindowWrap { private final long interval; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/OffloadReadHandleTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/OffloadReadHandleTest.java new file mode 100644 index 0000000000000..3a0a6787e16be --- /dev/null +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/OffloadReadHandleTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger; + +import com.google.common.collect.Lists; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.client.impl.LedgerEntriesImpl; +import org.apache.bookkeeper.client.impl.LedgerEntryImpl; +import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +public class OffloadReadHandleTest { + + @DataProvider(name = "flowPermits") + public Object[][] permits() { + return new Object[][]{ + {-1L}, + {0L}, + {100L}, + {10000L} + }; + } + + @Test(dataProvider = "flowPermits") + public void testFlowPermits(long flowPermits) throws Exception { + ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(100); + for (int a = 0; a < 100; a++) { + buf.writeByte(0); + } + LedgerEntry entry = LedgerEntryImpl.create(1, 1, buf.readableBytes(), buf); + List entryList = Lists.newArrayList(entry); + OffloadReadHandle h = null; + try (LedgerEntries entries = LedgerEntriesImpl.create(entryList)) { + ReadHandle handle = Mockito.mock(ReadHandle.class); + Mockito.doAnswer(inv -> CompletableFuture.completedFuture(entries)).when(handle).readAsync(1, 1); + + long start = System.currentTimeMillis(); + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setManagedLedgerOffloadFlowPermitsPerSecond(flowPermits); + + CompletableFuture future = OffloadReadHandle.create(handle, config, + OrderedScheduler.newSchedulerBuilder().numThreads(2).build()); + h = (OffloadReadHandle) future.get(); + h.read(1, 1); + h.read(1, 1); + h.read(1, 1); + h.read(1, 1); + h.read(1, 1); + h.read(1, 1); + h.read(1, 1); + h.read(1, 1); + h.read(1, 1); + h.read(1, 1); + + long actualDuration = System.currentTimeMillis() - start; + if (flowPermits <= 0L) { + Assert.assertEquals(actualDuration, 1000D, 1000D); + } else if (flowPermits == 100L) { + long expectDuration = TimeUnit.SECONDS.toMillis(8); + Assert.assertEquals(actualDuration, expectDuration, expectDuration * 0.2D); + } else if (flowPermits == 10000L) { + Assert.assertEquals(actualDuration, 1000D, 1000D); + } + } finally { + if (null != h) { + h.reset(); + } + } + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TimeWindowTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/TimeWindowTest.java similarity index 95% rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TimeWindowTest.java rename to managed-ledger/src/test/java/org/apache/bookkeeper/test/TimeWindowTest.java index 89528c1965397..01c8aee41b9ae 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TimeWindowTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/TimeWindowTest.java @@ -16,11 +16,14 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.stats; +package org.apache.bookkeeper.test; + +import org.apache.bookkeeper.mledger.util.TimeWindow; +import org.apache.bookkeeper.mledger.util.WindowWrap; +import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; -import org.testng.annotations.Test; public class TimeWindowTest { diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index d7617ef277444..507c3ab3908e8 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1974,6 +1974,11 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se doc = "The threshold to triggering automatic offload to long term storage" ) private long managedLedgerOffloadThresholdInSeconds = -1L; + @FieldContext( + category = CATEGORY_STORAGE_OFFLOADING, + doc = "The number of bytes permitted per second to offload on this broker" + ) + private long managedLedgerOffloadFlowPermitsPerSecond = -1; @FieldContext( category = CATEGORY_STORAGE_ML, doc = "Max number of entries to append to a cursor ledger" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 2015128b9e800..acd2027db21b8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1750,6 +1750,8 @@ public CompletableFuture getManagedLedgerConfig(TopicName t serviceConfig.getManagedLedgerMinimumBacklogEntriesForCaching()); managedLedgerConfig.setMaxBacklogBetweenCursorsForCaching( serviceConfig.getManagedLedgerMaxBacklogBetweenCursorsForCaching()); + managedLedgerConfig.setManagedLedgerOffloadFlowPermitsPerSecond( + serviceConfig.getManagedLedgerOffloadFlowPermitsPerSecond()); OffloadPoliciesImpl nsLevelOffloadPolicies = (OffloadPoliciesImpl) policies.map(p -> p.offload_policies).orElse(null); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java index 5283215feaac6..ce3fd5294ef3a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java @@ -42,12 +42,12 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.util.TimeWindow; +import org.apache.bookkeeper.mledger.util.WindowWrap; import org.apache.bookkeeper.stats.NullStatsProvider; import org.apache.bookkeeper.stats.StatsProvider; import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.broker.PulsarService; -import org.apache.pulsar.broker.stats.TimeWindow; -import org.apache.pulsar.broker.stats.WindowWrap; import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics; import org.apache.pulsar.broker.stats.metrics.ManagedLedgerCacheMetrics; import org.apache.pulsar.broker.stats.metrics.ManagedLedgerMetrics; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index db59942e15ce5..979d881932a50 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -58,6 +58,7 @@ import javax.crypto.SecretKey; import javax.naming.AuthenticationException; import lombok.Cleanup; +import org.apache.bookkeeper.mledger.util.TimeWindow; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.ServiceConfiguration; diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java index d5e09ba725421..56796ec8a310f 100644 --- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java +++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java @@ -144,8 +144,8 @@ public Map getOffloadDriverMetadata() { } /* - * ledgerMetadata stored in an index of -1 - * */ + * ledgerMetadata stored in an index of -1 + * */ @Override public CompletableFuture offload(ReadHandle readHandle, UUID uuid, Map extraMetadata) { CompletableFuture promise = new CompletableFuture<>(); @@ -394,4 +394,4 @@ public void close() { } } } -} +} \ No newline at end of file diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java index 6d69b5edbc3fb..a01c8ab64adbc 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java @@ -188,8 +188,8 @@ public CompletableFuture offload(ReadHandle readHandle, return; } OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create() - .withLedgerMetadata(readHandle.getLedgerMetadata()) - .withDataBlockHeaderLength(BlockAwareSegmentInputStreamImpl.getHeaderSize()); + .withLedgerMetadata(readHandle.getLedgerMetadata()) + .withDataBlockHeaderLength(BlockAwareSegmentInputStreamImpl.getHeaderSize()); String dataBlockKey = DataBlockUtils.dataBlockOffloadKey(readHandle.getId(), uuid); String indexBlockKey = DataBlockUtils.indexBlockOffloadKey(readHandle.getId(), uuid); log.info("ledger {} dataBlockKey {} indexBlockKey {}", readHandle.getId(), dataBlockKey, indexBlockKey); @@ -203,7 +203,7 @@ public CompletableFuture offload(ReadHandle readHandle, Map objectMetadata = new HashMap<>(userMetadata); objectMetadata.put("role", "data"); if (extraMetadata != null) { - objectMetadata.putAll(extraMetadata); + objectMetadata.putAll(extraMetadata); } DataBlockUtils.addVersionInfo(blobBuilder, objectMetadata); Blob blob = blobBuilder.build(); @@ -219,11 +219,10 @@ public CompletableFuture offload(ReadHandle readHandle, try { long startEntry = 0; int partId = 1; - long start = System.nanoTime(); long entryBytesWritten = 0; while (startEntry <= readHandle.getLastAddConfirmed()) { - int blockSize = BlockAwareSegmentInputStreamImpl - .calculateBlockSize(config.getMaxBlockSizeInBytes(), readHandle, startEntry, entryBytesWritten); + int blockSize = BlockAwareSegmentInputStreamImpl.calculateBlockSize( + config.getMaxBlockSizeInBytes(), readHandle, startEntry, entryBytesWritten); try (BlockAwareSegmentInputStream blockStream = new BlockAwareSegmentInputStreamImpl( readHandle, startEntry, blockSize, this.offloaderStats, topicName)) { @@ -287,7 +286,7 @@ public CompletableFuture offload(ReadHandle readHandle, Blob blob = blobBuilder .payload(indexPayload) .contentLength((long) indexStream.getStreamSize()) - .build(); + .build(); writeBlobStore.putBlob(config.getBucket(), blob); promise.complete(null); } catch (Throwable t) { @@ -525,7 +524,7 @@ private PositionImpl lastOffered() { */ private BlobStoreLocation getBlobStoreLocation(Map offloadDriverMetadata) { return (!offloadDriverMetadata.isEmpty()) ? new BlobStoreLocation(offloadDriverMetadata) : - new BlobStoreLocation(getOffloadDriverMetadata()); + new BlobStoreLocation(getOffloadDriverMetadata()); } @Override @@ -600,8 +599,8 @@ public CompletableFuture deleteOffloaded(long ledgerId, UUID uid, scheduler.chooseThread(ledgerId).execute(() -> { try { readBlobstore.removeBlobs(readBucket, - ImmutableList.of(DataBlockUtils.dataBlockOffloadKey(ledgerId, uid), - DataBlockUtils.indexBlockOffloadKey(ledgerId, uid))); + ImmutableList.of(DataBlockUtils.dataBlockOffloadKey(ledgerId, uid), + DataBlockUtils.indexBlockOffloadKey(ledgerId, uid))); promise.complete(null); } catch (Throwable t) { log.error("Failed delete Blob", t); @@ -726,4 +725,4 @@ private String scanContainer(OffloadedLedgerMetadataConsumer consumer, BlobStore return pages.getNextMarker(); } -} +} \ No newline at end of file