From bb70d7ada39e887a2672b821f529db0057508c2a Mon Sep 17 00:00:00 2001 From: Justin Prieto Date: Wed, 28 Jan 2026 16:33:23 -0500 Subject: [PATCH 1/3] Add hash-based journal selection for improved ledger distribution --- .../apache/bookkeeper/bookie/BookieImpl.java | 26 +++++++++- .../bookkeeper/conf/ServerConfiguration.java | 35 ++++++++++++++ .../bookie/BookieMultipleJournalsTest.java | 47 +++++++++++++++++++ conf/bk_server.conf | 13 +++++ site3/website/docs/reference/config.md | 5 +- 5 files changed, 123 insertions(+), 3 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java index 111f5134e78..38bb5d49eee 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java @@ -127,6 +127,7 @@ public class BookieImpl implements Bookie { private final ByteBufAllocator allocator; private final boolean writeDataToJournal; + private final boolean journalHashBasedSelection; // Write Callback do nothing static class NopWriteCallback implements WriteCallback { @@ -406,6 +407,7 @@ public BookieImpl(ServerConfiguration conf, this.ledgerDirsManager = ledgerDirsManager; this.indexDirsManager = indexDirsManager; this.writeDataToJournal = conf.getJournalWriteData(); + this.journalHashBasedSelection = conf.getJournalHashBasedSelection(); this.allocator = allocator; this.registrationManager = registrationManager; stateManager = initializeStateManager(); @@ -940,8 +942,30 @@ LedgerDescriptor getLedgerForEntry(ByteBuf entry, final byte[] masterKey) return handles.getHandle(ledgerId, masterKey, false); } + /** + * Constant for Fibonacci hashing. Multiplying by this constant adds randomness, breaking + * patterns in ledger IDs that would otherwise cause uneven journal distribution. + * + *

This is the same constant used as {@code GOLDEN_GAMMA} in + * {@link java.util.concurrent.ThreadLocalRandom} and {@link java.util.SplittableRandom} + * for seed mixing. + */ + private static final long FIBONACCI_HASH_CONSTANT = 0x9E3779B97F4A7C15L; + + /** + * Returns the journal to use for the given ledger. + * + *

When hash-based selection is enabled, uses Fibonacci hashing to distribute ledgers + * across journals. The xor with the right-shifted value folds the high 32 bits into + * the low 32 bits, adding more mixing prior to the modulo. + */ private Journal getJournal(long ledgerId) { - return journals.get(MathUtils.signSafeMod(ledgerId, journals.size())); + long index = ledgerId; + if (journalHashBasedSelection) { + index = ledgerId * FIBONACCI_HASH_CONSTANT; + index ^= index >>> 32; + } + return journals.get(MathUtils.signSafeMod(index, journals.size())); } @VisibleForTesting diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java index 79daeb3f5d3..21747122636 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java @@ -156,6 +156,7 @@ public class ServerConfiguration extends AbstractConfigurationWhen enabled, ledger IDs are hashed using Fibonacci hashing before + * selecting a journal. In some deployments (e.g., sharded setups), patterns + * in ledger IDs can cause uneven distribution across journals. This setting + * breaks those patterns. + * + *

WARNING: This setting is not backwards compatible. Changing this + * setting on an existing bookie will cause ledgers to be mapped to + * different journals than before, which can cause issues during recovery. + * Only enable this on new clusters or after careful migration planning. + * + *

Default is false for backwards compatibility. + * + * @return whether hash-based journal selection is enabled + */ + public boolean getJournalHashBasedSelection() { + return getBoolean(JOURNAL_HASH_BASED_SELECTION, false); + } + + /** + * Enable or disable hash-based journal selection for ledgers. + * + *

See {@link #getJournalHashBasedSelection()} for details. + * + * @param enabled whether to enable hash-based journal selection + * @return server configuration object + */ + public ServerConfiguration setJournalHashBasedSelection(boolean enabled) { + setProperty(JOURNAL_HASH_BASED_SELECTION, enabled); + return this; + } + /** * Enable or disable journal syncs. * diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java index 664e31541bf..b0d265a1ea4 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java @@ -141,4 +141,51 @@ public void testMultipleWritesAndBookieRestart() throws Exception { } } + /** + * Test that hash-based selection still allows correct read/write operations. + * + * This integration test verifies that when journalHashBasedSelection is enabled, + * entries can still be written and read back correctly. + */ + @Test + public void testHashBasedSelectionReadWrite() throws Exception { + // Restart bookies with hash-based selection enabled + restartBookies(conf -> { + conf.setJournalHashBasedSelection(true); + return conf; + }); + + // Create ledgers and write entries + final int numLedgers = 8; + final int numEntries = 10; + List writeHandles = new ArrayList<>(); + + for (int i = 0; i < numLedgers; i++) { + writeHandles.add(bkc.createLedger(1, 1, DigestType.CRC32, new byte[0])); + } + + for (int i = 0; i < numEntries; i++) { + for (int j = 0; j < numLedgers; j++) { + writeHandles.get(j).addEntry(("entry-" + i).getBytes()); + } + } + + // Close write handles + for (LedgerHandle lh : writeHandles) { + lh.close(); + } + + // Read back and verify + for (int i = 0; i < numLedgers; i++) { + LedgerHandle readHandle = bkc.openLedger(writeHandles.get(i).getId(), DigestType.CRC32, new byte[0]); + Enumeration entries = readHandle.readEntries(0, numEntries - 1); + + for (int j = 0; j < numEntries; j++) { + LedgerEntry entry = entries.nextElement(); + assertEquals("entry-" + j, new String(entry.getEntry())); + } + readHandle.close(); + } + } + } diff --git a/conf/bk_server.conf b/conf/bk_server.conf index d680e750cd7..1b431f281b6 100644 --- a/conf/bk_server.conf +++ b/conf/bk_server.conf @@ -402,6 +402,19 @@ journalDirectories=/tmp/bk-txn # Set the Channel Provider for journal. # The default value is # journalChannelProvider=org.apache.bookkeeper.bookie.DefaultFileChannelProvider + +# Enable hash-based selection of journals for ledgers in multi-journal setups. +# By default, ledgers are assigned to journals using ledgerId % numJournals. +# In some deployments (e.g., sharded setups), patterns in ledger IDs can cause +# uneven distribution across journals. When enabled, Fibonacci hashing is used +# for better distribution. +# +# WARNING: This setting is not backwards compatible. Changing this setting on an +# existing bookie will cause ledgers to be mapped to different journals than before, +# which can cause issues during recovery. Only enable this on new clusters or after +# careful migration planning. +# journalHashBasedSelection=false + ############################################################################# ## Ledger storage settings ############################################################################# diff --git a/site3/website/docs/reference/config.md b/site3/website/docs/reference/config.md index d9508c3488d..8658e94c6dd 100644 --- a/site3/website/docs/reference/config.md +++ b/site3/website/docs/reference/config.md @@ -123,8 +123,9 @@ The table below lists parameters that you can set to configure bookies. All conf | journalFlushWhenQueueEmpty | If we should flush the journal when journal queue is empty. | false | | journalAlignmentSize | All the journal writes and commits should be aligned to given size. If not, zeros will be padded to align to given size. | 512 | | journalBufferedEntriesThreshold | Maximum entries to buffer to impose on a journal write to achieve grouping. | | -| journalFlushWhenQueueEmpty | If we should flush the journal when journal queue is empty. | false | -| journalQueueSize | Set the size of the journal queue. | 10000 | +| journalFlushWhenQueueEmpty | If we should flush the journal when journal queue is empty. | false | +| journalQueueSize | Set the size of the journal queue. | 10000 | +| journalHashBasedSelection | Enable hash-based selection of journals for ledgers in multi-journal setups. By default, ledgers are assigned to journals using ledgerId % numJournals. In some deployments (e.g., sharded setups), patterns in ledger IDs can cause uneven distribution across journals. When enabled, Fibonacci hashing is used for better distribution.

**WARNING**: This setting is not backwards compatible. Changing this setting on an existing bookie will cause ledgers to be mapped to different journals than before, which can cause issues during recovery. Only enable this on new clusters or after careful migration planning. | false | ## Ledger storage settings From 044f8cdc2c1669c7c2980d585599332704545123 Mon Sep 17 00:00:00 2001 From: Justin Prieto Date: Mon, 2 Feb 2026 09:08:29 -0500 Subject: [PATCH 2/3] Add unit tests for journal hashing logic --- .../apache/bookkeeper/bookie/BookieImpl.java | 21 ++- .../bookie/JournalIndexComputationTest.java | 145 ++++++++++++++++++ 2 files changed, 162 insertions(+), 4 deletions(-) create mode 100644 bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/JournalIndexComputationTest.java diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java index 38bb5d49eee..6a03aef91ee 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java @@ -953,19 +953,32 @@ LedgerDescriptor getLedgerForEntry(ByteBuf entry, final byte[] masterKey) private static final long FIBONACCI_HASH_CONSTANT = 0x9E3779B97F4A7C15L; /** - * Returns the journal to use for the given ledger. + * Computes the journal index for a given ledger ID. * *

When hash-based selection is enabled, uses Fibonacci hashing to distribute ledgers * across journals. The xor with the right-shifted value folds the high 32 bits into * the low 32 bits, adding more mixing prior to the modulo. + * + * @param ledgerId the ledger ID + * @param numJournals the number of journals + * @param hashBasedSelection whether to use hash-based selection + * @return the journal index (0 to numJournals-1) */ - private Journal getJournal(long ledgerId) { + @VisibleForTesting + static int computeJournalIndex(long ledgerId, int numJournals, boolean hashBasedSelection) { long index = ledgerId; - if (journalHashBasedSelection) { + if (hashBasedSelection) { index = ledgerId * FIBONACCI_HASH_CONSTANT; index ^= index >>> 32; } - return journals.get(MathUtils.signSafeMod(index, journals.size())); + return MathUtils.signSafeMod(index, numJournals); + } + + /** + * Returns the journal to use for the given ledger. + */ + private Journal getJournal(long ledgerId) { + return journals.get(computeJournalIndex(ledgerId, journals.size(), journalHashBasedSelection)); } @VisibleForTesting diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/JournalIndexComputationTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/JournalIndexComputationTest.java new file mode 100644 index 00000000000..6b3b895d2f9 --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/JournalIndexComputationTest.java @@ -0,0 +1,145 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.bookie; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.Collection; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * Unit tests for journal index computation in BookieImpl. + * + *

Verifies that hash-based journal selection provides a near-even + * distribution of ledgers across journals for various ID patterns. + */ +@RunWith(Parameterized.class) +public class JournalIndexComputationTest { + + private static final double TOLERANCE = 0.10; + private static final int NUM_LEDGERS = 10000; + + private final int numJournals; + private final int stride; + + public JournalIndexComputationTest(int numJournals, int stride, String description) { + this.numJournals = numJournals; + this.stride = stride; + } + + @Parameterized.Parameters(name = "{2}") + public static Collection parameters() { + return Arrays.asList(new Object[][] { + // Sequential IDs (stride=1) with various journal counts + {1, 1, "1 journal"}, + {3, 1, "3 journals, sequential"}, + {4, 1, "4 journals, sequential"}, + {7, 1, "7 journals (prime), sequential"}, + {8, 1, "8 journals, sequential"}, + {15, 1, "15 journals, sequential"}, + + // Strided IDs + {4, 2, "4 journals, stride=2"}, + {4, 3, "4 journals, stride=3"}, + {4, 4, "4 journals, stride=4 (worst case without hash)"}, + {8, 8, "8 journals, stride=8 (worst case without hash)"}, + {4, 1024, "4 journals, stride=1024"}, + {8, 1024, "8 journals, stride=1024"}, + }); + } + + // Tests that ledger IDs with the given stride distribute evenly across journals + // when hash-based selection is enabled. + @Test + public void testEvenDistributionWithHashing() { + int[] counts = new int[numJournals]; + + for (int i = 0; i < NUM_LEDGERS; i++) { + long ledgerId = (long) i * stride; + int journalIndex = BookieImpl.computeJournalIndex(ledgerId, numJournals, true); + assertTrue("Index out of bounds", journalIndex >= 0 && journalIndex < numJournals); + counts[journalIndex]++; + + // Verify determinism + assertEquals("Hash not deterministic", journalIndex, + BookieImpl.computeJournalIndex(ledgerId, numJournals, true)); + } + + assertNearEvenDistribution(counts); + } + + // Tests that without hashing, journal index is computed via simple modulo + @Test + public void testSimpleModuloWithoutHashing() { + for (int i = 0; i < NUM_LEDGERS; i++) { + long ledgerId = (long) i * stride; + int journalIndex = BookieImpl.computeJournalIndex(ledgerId, numJournals, false); + assertTrue("Index out of bounds", journalIndex >= 0 && journalIndex < numJournals); + + // Verify determinism + assertEquals("Result not deterministic", journalIndex, + BookieImpl.computeJournalIndex(ledgerId, numJournals, false)); + + // Verify it matches simple modulo for non-negative IDs + int expectedIndex = (int) (ledgerId % numJournals); + assertEquals("Should match simple modulo", expectedIndex, journalIndex); + } + } + + // Tests that negative and very large ledger IDs produce valid indices for both + // hash and non-hash modes. + @Test + public void testEdgeCaseLedgerIds() { + for (boolean useHash : new boolean[] {true, false}) { + String mode = useHash ? "hash" : "modulo"; + + for (long ledgerId = -100; ledgerId < 0; ledgerId++) { + int idx = BookieImpl.computeJournalIndex(ledgerId, numJournals, useHash); + assertTrue("Negative ID produced invalid index (" + mode + ")", + idx >= 0 && idx < numJournals); + } + + for (int i = 0; i < 100; i++) { + long ledgerId = Long.MAX_VALUE - i; + int idx = BookieImpl.computeJournalIndex(ledgerId, numJournals, useHash); + assertTrue("Large ID produced invalid index (" + mode + ")", + idx >= 0 && idx < numJournals); + } + } + } + + private void assertNearEvenDistribution(int[] counts) { + int total = Arrays.stream(counts).sum(); + int expected = total / counts.length; + int maxDeviation = (int) (expected * TOLERANCE); + + for (int i = 0; i < counts.length; i++) { + int deviation = Math.abs(counts[i] - expected); + assertTrue(String.format("journal %d: %d (expected ~%d, max deviation %d)", + i, counts[i], expected, maxDeviation), + deviation <= maxDeviation); + } + } +} From fa2824460a7be35938fedd24383f4c647c0283aa Mon Sep 17 00:00:00 2001 From: Justin Prieto Date: Mon, 2 Feb 2026 10:28:30 -0500 Subject: [PATCH 3/3] naming --- .../apache/bookkeeper/bookie/JournalIndexComputationTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/JournalIndexComputationTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/JournalIndexComputationTest.java index 6b3b895d2f9..3a2ba5b1939 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/JournalIndexComputationTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/JournalIndexComputationTest.java @@ -65,8 +65,8 @@ public static Collection parameters() { {4, 3, "4 journals, stride=3"}, {4, 4, "4 journals, stride=4 (worst case without hash)"}, {8, 8, "8 journals, stride=8 (worst case without hash)"}, - {4, 1024, "4 journals, stride=1024"}, - {8, 1024, "8 journals, stride=1024"}, + {4, 1024, "4 journals, stride=1024 (worst case without hash)"}, + {8, 1024, "8 journals, stride=1024 (worst case without hash)"}, }); }