Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ public class BookieImpl implements Bookie {
private final ByteBufAllocator allocator;

private final boolean writeDataToJournal;
private final boolean journalHashBasedSelection;

// Write Callback do nothing
static class NopWriteCallback implements WriteCallback {
Expand Down Expand Up @@ -406,6 +407,7 @@ public BookieImpl(ServerConfiguration conf,
this.ledgerDirsManager = ledgerDirsManager;
this.indexDirsManager = indexDirsManager;
this.writeDataToJournal = conf.getJournalWriteData();
this.journalHashBasedSelection = conf.getJournalHashBasedSelection();
this.allocator = allocator;
this.registrationManager = registrationManager;
stateManager = initializeStateManager();
Expand Down Expand Up @@ -940,8 +942,43 @@ LedgerDescriptor getLedgerForEntry(ByteBuf entry, final byte[] masterKey)
return handles.getHandle(ledgerId, masterKey, false);
}

/**
* Constant for Fibonacci hashing. Multiplying by this constant adds randomness, breaking
* patterns in ledger IDs that would otherwise cause uneven journal distribution.
*
* <p>This is the same constant used as {@code GOLDEN_GAMMA} in
* {@link java.util.concurrent.ThreadLocalRandom} and {@link java.util.SplittableRandom}
* for seed mixing.
*/
private static final long FIBONACCI_HASH_CONSTANT = 0x9E3779B97F4A7C15L;

/**
* Computes the journal index for a given ledger ID.
*
* <p>When hash-based selection is enabled, uses Fibonacci hashing to distribute ledgers
* across journals. The xor with the right-shifted value folds the high 32 bits into
* the low 32 bits, adding more mixing prior to the modulo.
*
* @param ledgerId the ledger ID
* @param numJournals the number of journals
* @param hashBasedSelection whether to use hash-based selection
* @return the journal index (0 to numJournals-1)
*/
@VisibleForTesting
static int computeJournalIndex(long ledgerId, int numJournals, boolean hashBasedSelection) {
long index = ledgerId;
if (hashBasedSelection) {
index = ledgerId * FIBONACCI_HASH_CONSTANT;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about putting this block into a static method so that we can add unit tests ?

The function gets the index and the number of journals and computes the journal index

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your review. I added the tests and factored out the static method.

index ^= index >>> 32;
}
return MathUtils.signSafeMod(index, numJournals);
}

/**
* Returns the journal to use for the given ledger.
*/
private Journal getJournal(long ledgerId) {
return journals.get(MathUtils.signSafeMod(ledgerId, journals.size()));
return journals.get(computeJournalIndex(ledgerId, journals.size(), journalHashBasedSelection));
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
protected static final String JOURNAL_PAGECACHE_FLUSH_INTERVAL_MSEC = "journalPageCacheFlushIntervalMSec";
protected static final String JOURNAL_CHANNEL_PROVIDER = "journalChannelProvider";
protected static final String JOURNAL_REUSE_FILES = "journalReuseFiles";
protected static final String JOURNAL_HASH_BASED_SELECTION = "journalHashBasedSelection";
// backpressure control
protected static final String MAX_ADDS_IN_PROGRESS_LIMIT = "maxAddsInProgressLimit";
protected static final String MAX_READS_IN_PROGRESS_LIMIT = "maxReadsInProgressLimit";
Expand Down Expand Up @@ -2335,6 +2336,40 @@ public ServerConfiguration setJournalWriteData(boolean journalWriteData) {
return this;
}

/**
* Whether to use hash-based journal selection for ledgers.
*
* <p>When enabled, ledger IDs are hashed using Fibonacci hashing before
* selecting a journal. In some deployments (e.g., sharded setups), patterns
* in ledger IDs can cause uneven distribution across journals. This setting
* breaks those patterns.
*
* <p>WARNING: This setting is not backwards compatible. Changing this
* setting on an existing bookie will cause ledgers to be mapped to
* different journals than before, which can cause issues during recovery.
* Only enable this on new clusters or after careful migration planning.
*
* <p>Default is false for backwards compatibility.
*
* @return whether hash-based journal selection is enabled
*/
public boolean getJournalHashBasedSelection() {
return getBoolean(JOURNAL_HASH_BASED_SELECTION, false);
}

/**
* Enable or disable hash-based journal selection for ledgers.
*
* <p>See {@link #getJournalHashBasedSelection()} for details.
*
* @param enabled whether to enable hash-based journal selection
* @return server configuration object
*/
public ServerConfiguration setJournalHashBasedSelection(boolean enabled) {
setProperty(JOURNAL_HASH_BASED_SELECTION, enabled);
return this;
}

/**
* Enable or disable journal syncs.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,51 @@ public void testMultipleWritesAndBookieRestart() throws Exception {
}
}

/**
* Test that hash-based selection still allows correct read/write operations.
*
* This integration test verifies that when journalHashBasedSelection is enabled,
* entries can still be written and read back correctly.
*/
@Test
public void testHashBasedSelectionReadWrite() throws Exception {
// Restart bookies with hash-based selection enabled
restartBookies(conf -> {
conf.setJournalHashBasedSelection(true);
return conf;
});

// Create ledgers and write entries
final int numLedgers = 8;
final int numEntries = 10;
List<LedgerHandle> writeHandles = new ArrayList<>();

for (int i = 0; i < numLedgers; i++) {
writeHandles.add(bkc.createLedger(1, 1, DigestType.CRC32, new byte[0]));
}

for (int i = 0; i < numEntries; i++) {
for (int j = 0; j < numLedgers; j++) {
writeHandles.get(j).addEntry(("entry-" + i).getBytes());
}
}

// Close write handles
for (LedgerHandle lh : writeHandles) {
lh.close();
}

// Read back and verify
for (int i = 0; i < numLedgers; i++) {
LedgerHandle readHandle = bkc.openLedger(writeHandles.get(i).getId(), DigestType.CRC32, new byte[0]);
Enumeration<LedgerEntry> entries = readHandle.readEntries(0, numEntries - 1);

for (int j = 0; j < numEntries; j++) {
LedgerEntry entry = entries.nextElement();
assertEquals("entry-" + j, new String(entry.getEntry()));
}
readHandle.close();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.bookie;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.util.Arrays;
import java.util.Collection;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

/**
* Unit tests for journal index computation in BookieImpl.
*
* <p>Verifies that hash-based journal selection provides a near-even
* distribution of ledgers across journals for various ID patterns.
*/
@RunWith(Parameterized.class)
public class JournalIndexComputationTest {

private static final double TOLERANCE = 0.10;
private static final int NUM_LEDGERS = 10000;

private final int numJournals;
private final int stride;

public JournalIndexComputationTest(int numJournals, int stride, String description) {
this.numJournals = numJournals;
this.stride = stride;
}

@Parameterized.Parameters(name = "{2}")
public static Collection<Object[]> parameters() {
return Arrays.asList(new Object[][] {
// Sequential IDs (stride=1) with various journal counts
{1, 1, "1 journal"},
{3, 1, "3 journals, sequential"},
{4, 1, "4 journals, sequential"},
{7, 1, "7 journals (prime), sequential"},
{8, 1, "8 journals, sequential"},
{15, 1, "15 journals, sequential"},

// Strided IDs
{4, 2, "4 journals, stride=2"},
{4, 3, "4 journals, stride=3"},
{4, 4, "4 journals, stride=4 (worst case without hash)"},
{8, 8, "8 journals, stride=8 (worst case without hash)"},
{4, 1024, "4 journals, stride=1024 (worst case without hash)"},
{8, 1024, "8 journals, stride=1024 (worst case without hash)"},
});
}

// Tests that ledger IDs with the given stride distribute evenly across journals
// when hash-based selection is enabled.
@Test
public void testEvenDistributionWithHashing() {
int[] counts = new int[numJournals];

for (int i = 0; i < NUM_LEDGERS; i++) {
long ledgerId = (long) i * stride;
int journalIndex = BookieImpl.computeJournalIndex(ledgerId, numJournals, true);
assertTrue("Index out of bounds", journalIndex >= 0 && journalIndex < numJournals);
counts[journalIndex]++;

// Verify determinism
assertEquals("Hash not deterministic", journalIndex,
BookieImpl.computeJournalIndex(ledgerId, numJournals, true));
}

assertNearEvenDistribution(counts);
}

// Tests that without hashing, journal index is computed via simple modulo
@Test
public void testSimpleModuloWithoutHashing() {
for (int i = 0; i < NUM_LEDGERS; i++) {
long ledgerId = (long) i * stride;
int journalIndex = BookieImpl.computeJournalIndex(ledgerId, numJournals, false);
assertTrue("Index out of bounds", journalIndex >= 0 && journalIndex < numJournals);

// Verify determinism
assertEquals("Result not deterministic", journalIndex,
BookieImpl.computeJournalIndex(ledgerId, numJournals, false));

// Verify it matches simple modulo for non-negative IDs
int expectedIndex = (int) (ledgerId % numJournals);
assertEquals("Should match simple modulo", expectedIndex, journalIndex);
}
}

// Tests that negative and very large ledger IDs produce valid indices for both
// hash and non-hash modes.
@Test
public void testEdgeCaseLedgerIds() {
for (boolean useHash : new boolean[] {true, false}) {
String mode = useHash ? "hash" : "modulo";

for (long ledgerId = -100; ledgerId < 0; ledgerId++) {
int idx = BookieImpl.computeJournalIndex(ledgerId, numJournals, useHash);
assertTrue("Negative ID produced invalid index (" + mode + ")",
idx >= 0 && idx < numJournals);
}

for (int i = 0; i < 100; i++) {
long ledgerId = Long.MAX_VALUE - i;
int idx = BookieImpl.computeJournalIndex(ledgerId, numJournals, useHash);
assertTrue("Large ID produced invalid index (" + mode + ")",
idx >= 0 && idx < numJournals);
}
}
}

private void assertNearEvenDistribution(int[] counts) {
int total = Arrays.stream(counts).sum();
int expected = total / counts.length;
int maxDeviation = (int) (expected * TOLERANCE);

for (int i = 0; i < counts.length; i++) {
int deviation = Math.abs(counts[i] - expected);
assertTrue(String.format("journal %d: %d (expected ~%d, max deviation %d)",
i, counts[i], expected, maxDeviation),
deviation <= maxDeviation);
}
}
}
13 changes: 13 additions & 0 deletions conf/bk_server.conf
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,19 @@ journalDirectories=/tmp/bk-txn
# Set the Channel Provider for journal.
# The default value is
# journalChannelProvider=org.apache.bookkeeper.bookie.DefaultFileChannelProvider

# Enable hash-based selection of journals for ledgers in multi-journal setups.
# By default, ledgers are assigned to journals using ledgerId % numJournals.
# In some deployments (e.g., sharded setups), patterns in ledger IDs can cause
# uneven distribution across journals. When enabled, Fibonacci hashing is used
# for better distribution.
#
# WARNING: This setting is not backwards compatible. Changing this setting on an
# existing bookie will cause ledgers to be mapped to different journals than before,
# which can cause issues during recovery. Only enable this on new clusters or after
# careful migration planning.
# journalHashBasedSelection=false

#############################################################################
## Ledger storage settings
#############################################################################
Expand Down
5 changes: 3 additions & 2 deletions site3/website/docs/reference/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,9 @@ The table below lists parameters that you can set to configure bookies. All conf
| journalFlushWhenQueueEmpty | If we should flush the journal when journal queue is empty. | false |
| journalAlignmentSize | All the journal writes and commits should be aligned to given size. If not, zeros will be padded to align to given size. | 512 |
| journalBufferedEntriesThreshold | Maximum entries to buffer to impose on a journal write to achieve grouping. | |
| journalFlushWhenQueueEmpty | If we should flush the journal when journal queue is empty. | false |
| journalQueueSize | Set the size of the journal queue. | 10000 |
| journalFlushWhenQueueEmpty | If we should flush the journal when journal queue is empty. | false |
| journalQueueSize | Set the size of the journal queue. | 10000 |
| journalHashBasedSelection | Enable hash-based selection of journals for ledgers in multi-journal setups. By default, ledgers are assigned to journals using ledgerId % numJournals. In some deployments (e.g., sharded setups), patterns in ledger IDs can cause uneven distribution across journals. When enabled, Fibonacci hashing is used for better distribution.<br /><br />**WARNING**: This setting is not backwards compatible. Changing this setting on an existing bookie will cause ledgers to be mapped to different journals than before, which can cause issues during recovery. Only enable this on new clusters or after careful migration planning. | false |


## Ledger storage settings
Expand Down
Loading