diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java index 111f5134e78..6a03aef91ee 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java @@ -127,6 +127,7 @@ public class BookieImpl implements Bookie { private final ByteBufAllocator allocator; private final boolean writeDataToJournal; + private final boolean journalHashBasedSelection; // Write Callback do nothing static class NopWriteCallback implements WriteCallback { @@ -406,6 +407,7 @@ public BookieImpl(ServerConfiguration conf, this.ledgerDirsManager = ledgerDirsManager; this.indexDirsManager = indexDirsManager; this.writeDataToJournal = conf.getJournalWriteData(); + this.journalHashBasedSelection = conf.getJournalHashBasedSelection(); this.allocator = allocator; this.registrationManager = registrationManager; stateManager = initializeStateManager(); @@ -940,8 +942,43 @@ LedgerDescriptor getLedgerForEntry(ByteBuf entry, final byte[] masterKey) return handles.getHandle(ledgerId, masterKey, false); } + /** + * Constant for Fibonacci hashing. Multiplying by this constant adds randomness, breaking + * patterns in ledger IDs that would otherwise cause uneven journal distribution. + * + *

This is the same constant used as {@code GOLDEN_GAMMA} in + * {@link java.util.concurrent.ThreadLocalRandom} and {@link java.util.SplittableRandom} + * for seed mixing. + */ + private static final long FIBONACCI_HASH_CONSTANT = 0x9E3779B97F4A7C15L; + + /** + * Computes the journal index for a given ledger ID. + * + *

When hash-based selection is enabled, uses Fibonacci hashing to distribute ledgers + * across journals. The xor with the right-shifted value folds the high 32 bits into + * the low 32 bits, adding more mixing prior to the modulo. + * + * @param ledgerId the ledger ID + * @param numJournals the number of journals + * @param hashBasedSelection whether to use hash-based selection + * @return the journal index (0 to numJournals-1) + */ + @VisibleForTesting + static int computeJournalIndex(long ledgerId, int numJournals, boolean hashBasedSelection) { + long index = ledgerId; + if (hashBasedSelection) { + index = ledgerId * FIBONACCI_HASH_CONSTANT; + index ^= index >>> 32; + } + return MathUtils.signSafeMod(index, numJournals); + } + + /** + * Returns the journal to use for the given ledger. + */ private Journal getJournal(long ledgerId) { - return journals.get(MathUtils.signSafeMod(ledgerId, journals.size())); + return journals.get(computeJournalIndex(ledgerId, journals.size(), journalHashBasedSelection)); } @VisibleForTesting diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java index 79daeb3f5d3..21747122636 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java @@ -156,6 +156,7 @@ public class ServerConfiguration extends AbstractConfigurationWhen enabled, ledger IDs are hashed using Fibonacci hashing before + * selecting a journal. In some deployments (e.g., sharded setups), patterns + * in ledger IDs can cause uneven distribution across journals. This setting + * breaks those patterns. + * + *

WARNING: This setting is not backwards compatible. Changing this + * setting on an existing bookie will cause ledgers to be mapped to + * different journals than before, which can cause issues during recovery. + * Only enable this on new clusters or after careful migration planning. + * + *

Default is false for backwards compatibility. + * + * @return whether hash-based journal selection is enabled + */ + public boolean getJournalHashBasedSelection() { + return getBoolean(JOURNAL_HASH_BASED_SELECTION, false); + } + + /** + * Enable or disable hash-based journal selection for ledgers. + * + *

See {@link #getJournalHashBasedSelection()} for details. + * + * @param enabled whether to enable hash-based journal selection + * @return server configuration object + */ + public ServerConfiguration setJournalHashBasedSelection(boolean enabled) { + setProperty(JOURNAL_HASH_BASED_SELECTION, enabled); + return this; + } + /** * Enable or disable journal syncs. * diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java index 664e31541bf..b0d265a1ea4 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java @@ -141,4 +141,51 @@ public void testMultipleWritesAndBookieRestart() throws Exception { } } + /** + * Test that hash-based selection still allows correct read/write operations. + * + * This integration test verifies that when journalHashBasedSelection is enabled, + * entries can still be written and read back correctly. + */ + @Test + public void testHashBasedSelectionReadWrite() throws Exception { + // Restart bookies with hash-based selection enabled + restartBookies(conf -> { + conf.setJournalHashBasedSelection(true); + return conf; + }); + + // Create ledgers and write entries + final int numLedgers = 8; + final int numEntries = 10; + List writeHandles = new ArrayList<>(); + + for (int i = 0; i < numLedgers; i++) { + writeHandles.add(bkc.createLedger(1, 1, DigestType.CRC32, new byte[0])); + } + + for (int i = 0; i < numEntries; i++) { + for (int j = 0; j < numLedgers; j++) { + writeHandles.get(j).addEntry(("entry-" + i).getBytes()); + } + } + + // Close write handles + for (LedgerHandle lh : writeHandles) { + lh.close(); + } + + // Read back and verify + for (int i = 0; i < numLedgers; i++) { + LedgerHandle readHandle = bkc.openLedger(writeHandles.get(i).getId(), DigestType.CRC32, new byte[0]); + Enumeration entries = readHandle.readEntries(0, numEntries - 1); + + for (int j = 0; j < numEntries; j++) { + LedgerEntry entry = entries.nextElement(); + assertEquals("entry-" + j, new String(entry.getEntry())); + } + readHandle.close(); + } + } + } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/JournalIndexComputationTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/JournalIndexComputationTest.java new file mode 100644 index 00000000000..3a2ba5b1939 --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/JournalIndexComputationTest.java @@ -0,0 +1,145 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.bookie; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.Collection; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * Unit tests for journal index computation in BookieImpl. + * + *

Verifies that hash-based journal selection provides a near-even + * distribution of ledgers across journals for various ID patterns. + */ +@RunWith(Parameterized.class) +public class JournalIndexComputationTest { + + private static final double TOLERANCE = 0.10; + private static final int NUM_LEDGERS = 10000; + + private final int numJournals; + private final int stride; + + public JournalIndexComputationTest(int numJournals, int stride, String description) { + this.numJournals = numJournals; + this.stride = stride; + } + + @Parameterized.Parameters(name = "{2}") + public static Collection parameters() { + return Arrays.asList(new Object[][] { + // Sequential IDs (stride=1) with various journal counts + {1, 1, "1 journal"}, + {3, 1, "3 journals, sequential"}, + {4, 1, "4 journals, sequential"}, + {7, 1, "7 journals (prime), sequential"}, + {8, 1, "8 journals, sequential"}, + {15, 1, "15 journals, sequential"}, + + // Strided IDs + {4, 2, "4 journals, stride=2"}, + {4, 3, "4 journals, stride=3"}, + {4, 4, "4 journals, stride=4 (worst case without hash)"}, + {8, 8, "8 journals, stride=8 (worst case without hash)"}, + {4, 1024, "4 journals, stride=1024 (worst case without hash)"}, + {8, 1024, "8 journals, stride=1024 (worst case without hash)"}, + }); + } + + // Tests that ledger IDs with the given stride distribute evenly across journals + // when hash-based selection is enabled. + @Test + public void testEvenDistributionWithHashing() { + int[] counts = new int[numJournals]; + + for (int i = 0; i < NUM_LEDGERS; i++) { + long ledgerId = (long) i * stride; + int journalIndex = BookieImpl.computeJournalIndex(ledgerId, numJournals, true); + assertTrue("Index out of bounds", journalIndex >= 0 && journalIndex < numJournals); + counts[journalIndex]++; + + // Verify determinism + assertEquals("Hash not deterministic", journalIndex, + BookieImpl.computeJournalIndex(ledgerId, numJournals, true)); + } + + assertNearEvenDistribution(counts); + } + + // Tests that without hashing, journal index is computed via simple modulo + @Test + public void testSimpleModuloWithoutHashing() { + for (int i = 0; i < NUM_LEDGERS; i++) { + long ledgerId = (long) i * stride; + int journalIndex = BookieImpl.computeJournalIndex(ledgerId, numJournals, false); + assertTrue("Index out of bounds", journalIndex >= 0 && journalIndex < numJournals); + + // Verify determinism + assertEquals("Result not deterministic", journalIndex, + BookieImpl.computeJournalIndex(ledgerId, numJournals, false)); + + // Verify it matches simple modulo for non-negative IDs + int expectedIndex = (int) (ledgerId % numJournals); + assertEquals("Should match simple modulo", expectedIndex, journalIndex); + } + } + + // Tests that negative and very large ledger IDs produce valid indices for both + // hash and non-hash modes. + @Test + public void testEdgeCaseLedgerIds() { + for (boolean useHash : new boolean[] {true, false}) { + String mode = useHash ? "hash" : "modulo"; + + for (long ledgerId = -100; ledgerId < 0; ledgerId++) { + int idx = BookieImpl.computeJournalIndex(ledgerId, numJournals, useHash); + assertTrue("Negative ID produced invalid index (" + mode + ")", + idx >= 0 && idx < numJournals); + } + + for (int i = 0; i < 100; i++) { + long ledgerId = Long.MAX_VALUE - i; + int idx = BookieImpl.computeJournalIndex(ledgerId, numJournals, useHash); + assertTrue("Large ID produced invalid index (" + mode + ")", + idx >= 0 && idx < numJournals); + } + } + } + + private void assertNearEvenDistribution(int[] counts) { + int total = Arrays.stream(counts).sum(); + int expected = total / counts.length; + int maxDeviation = (int) (expected * TOLERANCE); + + for (int i = 0; i < counts.length; i++) { + int deviation = Math.abs(counts[i] - expected); + assertTrue(String.format("journal %d: %d (expected ~%d, max deviation %d)", + i, counts[i], expected, maxDeviation), + deviation <= maxDeviation); + } + } +} diff --git a/conf/bk_server.conf b/conf/bk_server.conf index d680e750cd7..1b431f281b6 100644 --- a/conf/bk_server.conf +++ b/conf/bk_server.conf @@ -402,6 +402,19 @@ journalDirectories=/tmp/bk-txn # Set the Channel Provider for journal. # The default value is # journalChannelProvider=org.apache.bookkeeper.bookie.DefaultFileChannelProvider + +# Enable hash-based selection of journals for ledgers in multi-journal setups. +# By default, ledgers are assigned to journals using ledgerId % numJournals. +# In some deployments (e.g., sharded setups), patterns in ledger IDs can cause +# uneven distribution across journals. When enabled, Fibonacci hashing is used +# for better distribution. +# +# WARNING: This setting is not backwards compatible. Changing this setting on an +# existing bookie will cause ledgers to be mapped to different journals than before, +# which can cause issues during recovery. Only enable this on new clusters or after +# careful migration planning. +# journalHashBasedSelection=false + ############################################################################# ## Ledger storage settings ############################################################################# diff --git a/site3/website/docs/reference/config.md b/site3/website/docs/reference/config.md index d9508c3488d..8658e94c6dd 100644 --- a/site3/website/docs/reference/config.md +++ b/site3/website/docs/reference/config.md @@ -123,8 +123,9 @@ The table below lists parameters that you can set to configure bookies. All conf | journalFlushWhenQueueEmpty | If we should flush the journal when journal queue is empty. | false | | journalAlignmentSize | All the journal writes and commits should be aligned to given size. If not, zeros will be padded to align to given size. | 512 | | journalBufferedEntriesThreshold | Maximum entries to buffer to impose on a journal write to achieve grouping. | | -| journalFlushWhenQueueEmpty | If we should flush the journal when journal queue is empty. | false | -| journalQueueSize | Set the size of the journal queue. | 10000 | +| journalFlushWhenQueueEmpty | If we should flush the journal when journal queue is empty. | false | +| journalQueueSize | Set the size of the journal queue. | 10000 | +| journalHashBasedSelection | Enable hash-based selection of journals for ledgers in multi-journal setups. By default, ledgers are assigned to journals using ledgerId % numJournals. In some deployments (e.g., sharded setups), patterns in ledger IDs can cause uneven distribution across journals. When enabled, Fibonacci hashing is used for better distribution.

**WARNING**: This setting is not backwards compatible. Changing this setting on an existing bookie will cause ledgers to be mapped to different journals than before, which can cause issues during recovery. Only enable this on new clusters or after careful migration planning. | false | ## Ledger storage settings