Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public class ManagedLedgerConfig {
private int minimumBacklogCursorsForCaching = 0;
private int minimumBacklogEntriesForCaching = 1000;
private int maxBacklogBetweenCursorsForCaching = 1000;
private long managedLedgerOffloadFlowPermitsPerSecond = -1;

@Getter
@Setter
Expand Down Expand Up @@ -752,5 +753,23 @@ public String getShadowSource() {
return MapUtils.getString(properties, PROPERTY_SOURCE_TOPIC_KEY);
}

/**
* Set permitted size to offload on the broker.
*
* @param managedLedgerOffloadBrokerFlowPermit
*/
public void setManagedLedgerOffloadFlowPermitsPerSecond(long managedLedgerOffloadBrokerFlowPermit) {
this.managedLedgerOffloadFlowPermitsPerSecond = managedLedgerOffloadBrokerFlowPermit;
}

/**
* Get permitted size to offload on the broker.
*
* @return
*/
public long getManagedLedgerOffloadFlowPermitsPerSecond() {
return managedLedgerOffloadFlowPermitsPerSecond;
}

public static final String PROPERTY_SOURCE_TOPIC_KEY = "PULSAR.SHADOW_SOURCE";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger;

import com.google.common.annotations.VisibleForTesting;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.common.util.qos.AsyncTokenBucket;

/**
* OffloadReadHandle is a wrapper of ReadHandle to offload read operations.
*/
public final class OffloadReadHandle implements ReadHandle {
private static final AtomicBoolean INITIALIZED = new AtomicBoolean(false);
private static volatile long flowPermits = -1L;
private static volatile AsyncTokenBucket tokenBucket;

private final ReadHandle delegate;
private final long averageEntrySize;

private OffloadReadHandle(ReadHandle handle, ManagedLedgerConfig config,
MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo) {
initialize(config);
this.delegate = Objects.requireNonNull(handle);
Objects.requireNonNull(ledgerInfo);
long averageEntrySize = ledgerInfo.getSize() / ledgerInfo.getEntries();
if (averageEntrySize <= 0) {
averageEntrySize = 1;
}
this.averageEntrySize = averageEntrySize;
}

private static void initialize(ManagedLedgerConfig config) {
if (INITIALIZED.compareAndSet(false, true)) {
flowPermits = config.getManagedLedgerOffloadFlowPermitsPerSecond();
if (flowPermits > 0) {
tokenBucket = AsyncTokenBucket.builder().initialTokens(0).capacity(2 * flowPermits)
.rate(flowPermits).build();
}
}
}

public static CompletableFuture<ReadHandle> create(ReadHandle handle, ManagedLedgerConfig config,
MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo) {
return CompletableFuture.completedFuture(new OffloadReadHandle(handle, config, ledgerInfo));
}

@Override
public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) {
long numEntries = lastEntry - firstEntry + 1;
long numBytes = numEntries * averageEntrySize;

long delayMillis;
// block the offloader thread if the flow control permits is exceeded.
while ((delayMillis = calculateDelayMillis(numBytes)) > 0) {
try {
Thread.sleep(delayMillis);
} catch (InterruptedException ex) {
return CompletableFuture.failedFuture(ex);
}
}

return delegate.readAsync(firstEntry, lastEntry);
}

@Override
public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry) {
return this.delegate.readUnconfirmedAsync(firstEntry, lastEntry);
}

@Override
public CompletableFuture<Long> readLastAddConfirmedAsync() {
return this.delegate.readLastAddConfirmedAsync();
}

@Override
public CompletableFuture<Long> tryReadLastAddConfirmedAsync() {
return this.delegate.tryReadLastAddConfirmedAsync();
}

@Override
public long getLastAddConfirmed() {
return this.delegate.getLastAddConfirmed();
}

@Override
public long getLength() {
return this.delegate.getLength();
}

@Override
public boolean isClosed() {
return this.delegate.isClosed();
}

@Override
public CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntryAsync(
long entryId, long timeOutInMillis, boolean parallel) {
return this.delegate.readLastAddConfirmedAndEntryAsync(entryId, timeOutInMillis, parallel);
}

@Override
public long getId() {
return this.delegate.getId();
}

@Override
public CompletableFuture<Void> closeAsync() {
return this.delegate.closeAsync();
}

@Override
public LedgerMetadata getLedgerMetadata() {
return this.delegate.getLedgerMetadata();
}


private static synchronized long calculateDelayMillis(long numBytes) {
if (flowPermits <= 0) {
return 0;
}
if (numBytes <= 0) {
return 0;
}

if (tokenBucket.containsTokens(true)) {
long token = tokenBucket.getTokens();
if (token > 0) {
// To prevent flowPermits is less than each batch size.
tokenBucket.consumeTokens(numBytes);
return 0;
}
}

return TimeUnit.NANOSECONDS.toMillis(tokenBucket.calculateThrottlingDuration());
}

@VisibleForTesting
public void reset() {
INITIALIZED.set(false);
flowPermits = -1L;
tokenBucket = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.ManagedLedgerMXBean;
import org.apache.bookkeeper.mledger.OffloadReadHandle;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.WaitingEntryCallBack;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
Expand Down Expand Up @@ -3255,6 +3256,7 @@ void offloadLoop(CompletableFuture<PositionImpl> promise, Queue<LedgerInfo> ledg

prepareLedgerInfoForOffloaded(ledgerId, uuid, driverName, driverMetadata)
.thenCompose((ignore) -> getLedgerHandle(ledgerId))
.thenCompose(h -> OffloadReadHandle.create(h, config, info))
.thenCompose(readHandle -> config.getLedgerOffloader().offload(readHandle, uuid, extraMetadata))
.thenCompose((ignore) -> {
return Retries.run(Backoff.exponentialJittered(TimeUnit.SECONDS.toMillis(1),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger;

import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class OffloadReadHandleTest extends MockedBookKeeperTestCase {

@DataProvider(name = "flowPermits")
public Object[][] permits() {
return new Object[][]{
{-1L},
{0L},
{50L},
{100L},
{10000L}
};
}

@Test(dataProvider = "flowPermits")
public void testFlowPermits(long flowPermits) throws Exception {
OffloadReadHandle handle = (OffloadReadHandle) initializeReadHandle(flowPermits);
try {
long start = System.currentTimeMillis();
handle.read(1, 1);
handle.read(1, 1);
handle.read(1, 1);
handle.read(1, 1);
handle.read(1, 1);

long actualDuration = System.currentTimeMillis() - start;
if (flowPermits <= 0L) {
Assert.assertEquals(actualDuration, 4000D, 4000D);
} else if (flowPermits == 50L) {
long expectDuration = 8000;
Assert.assertEquals(actualDuration, expectDuration, expectDuration * 0.2D);
} else if (flowPermits == 100L) {
long expectDuration = 4000;
Assert.assertEquals(actualDuration, expectDuration, expectDuration * 0.2D);
} else if (flowPermits == 10000L) {
Assert.assertEquals(actualDuration, 1000D, 1000D);
}
} finally {
handle.close();
handle.reset();
}
}


@Test
public void testOffloadFlowPermitsMultiThreads() throws Exception {
OffloadReadHandle handle = (OffloadReadHandle) initializeReadHandle(1000);
AtomicBoolean failed = new AtomicBoolean(false);
CountDownLatch latch = new CountDownLatch(10);
try {
long start = System.currentTimeMillis();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
for (int j = 0; j < 10; j++) {
try {
handle.read(1, 1);
} catch (Exception e) {
failed.set(true);
}
}
latch.countDown();
}).start();
}

latch.await();

Assert.assertFalse(failed.get());
long actualDuration = System.currentTimeMillis() - start;
long expectDuration = TimeUnit.SECONDS.toMillis(10);
Assert.assertEquals(actualDuration, expectDuration, expectDuration * 0.2D);
} finally {
handle.close();
handle.reset();
}
}


private ReadHandle initializeReadHandle(long flowPermits) throws Exception {
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(100);
for (int a = 0; a < 100; a++) {
buf.writeByte(0);
}
LedgerEntry entry = LedgerEntryImpl.create(1, 1, buf.readableBytes(), buf);
List<LedgerEntry> entryList = Lists.newArrayList(entry);
LedgerEntries entries = LedgerEntriesImpl.create(entryList);
ReadHandle handle = Mockito.mock(ReadHandle.class);
Mockito.doAnswer(inv -> CompletableFuture.completedFuture(entries)).when(handle)
.readAsync(Mockito.anyLong(), Mockito.anyLong());
Mockito.doAnswer(inv -> {
entries.close();
return CompletableFuture.completedFuture(null);
}).when(handle).closeAsync();

ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setManagedLedgerOffloadFlowPermitsPerSecond(flowPermits);

CompletableFuture<ReadHandle> future = OffloadReadHandle.create(handle, config,
MLDataFormats.ManagedLedgerInfo.LedgerInfo.newBuilder().setLedgerId(1)
.setEntries(1).setSize(100).build());

return future.get();
}
}
2 changes: 1 addition & 1 deletion microbench/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-broker</artifactId>
<artifactId>pulsar-common</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.apache.pulsar.broker.qos;
package org.apache.pulsar.common.util.qos;

import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@
/**
* Benchmarks for Pulsar broker Quality of Service (QoS) related classes.
*/
package org.apache.pulsar.broker.qos;
package org.apache.pulsar.common.util.qos;
Original file line number Diff line number Diff line change
Expand Up @@ -2087,6 +2087,11 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
doc = "The threshold to triggering automatic offload to long term storage"
)
private long managedLedgerOffloadThresholdInSeconds = -1L;
@FieldContext(
category = CATEGORY_STORAGE_OFFLOADING,
doc = "The number of bytes permitted per second to offload on this broker"
)
private long managedLedgerOffloadFlowPermitsPerSecond = -1;
@FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "Max number of entries to append to a cursor ledger"
Expand Down
Loading
Loading