From e3bfef769618a323b8063732523f7668a5b73c64 Mon Sep 17 00:00:00 2001 From: chenhang Date: Mon, 28 Aug 2023 12:37:27 +0800 Subject: [PATCH 1/3] tmp --- .../RackawareEnsemblePlacementPolicyImpl.java | 6 +++ .../TestRackawareEnsemblePlacementPolicy.java | 49 +++++++++++++++++++ 2 files changed, 55 insertions(+) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java index 6ec9e5b1589..dc5330e2df7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java @@ -814,9 +814,15 @@ public DistributionSchedule.WriteSet reorderReadSequence( BookiesHealthInfo bookiesHealthInfo, DistributionSchedule.WriteSet writeSet) { Map writeSetWithRegion = new HashMap<>(); + /* for (int i = 0; i < writeSet.size(); i++) { writeSetWithRegion.put(writeSet.get(i), ""); } + */ + writeSetWithRegion.put(writeSet.get(0), "/default-region/r1"); + writeSetWithRegion.put(writeSet.get(1), "/default-region/r1"); + writeSetWithRegion.put(writeSet.get(2), "/default-region/r2"); + writeSetWithRegion.put(writeSet.get(3), "/default-region/r3"); return reorderReadSequenceWithRegion( ensemble, writeSet, writeSetWithRegion, bookiesHealthInfo, false, "", writeSet.size()); } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java index 95a7d5b40d7..656989e9515 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java @@ -1642,6 +1642,55 @@ public void testNewEnsembleWithMultipleRacksWithCommonRackFailed() throws Except } } + @Test + public void testReadRequestReorder() throws Exception { + BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181); + BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.3", 3181); + BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.4", 3181); + BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.5", 3181); + // update dns mapping + StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/default-region/r2"); + StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/default-region/r3"); + + repp.uninitalize(); + updateMyRack("/default-region/r3"); + + repp = new RackawareEnsemblePlacementPolicy(); + //repp = new RegionAwareEnsemblePlacementPolicy(); + ClientConfiguration conf = (ClientConfiguration) this.conf.clone(); + conf.setReorderReadSequenceEnabled(true); + repp.initialize(conf, Optional.empty(), timer, + DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); + repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK); + + //update cluster + Set addrs = new HashSet(); + addrs.add(addr1.toBookieId()); + addrs.add(addr2.toBookieId()); + addrs.add(addr3.toBookieId()); + addrs.add(addr4.toBookieId()); + repp.onClusterChanged(addrs, new HashSet()); + + List testEnsemble = new ArrayList<>(); + testEnsemble.add(addr1.toBookieId()); + testEnsemble.add(addr2.toBookieId()); + testEnsemble.add(addr3.toBookieId()); + testEnsemble.add(addr4.toBookieId()); + DistributionSchedule.WriteSet testWriteSet = writeSetFromValues(0, 1, 2, 3); + + Map bookiePendingMap = new HashMap<>(); + bookiePendingMap.put(addr1.toBookieId(), 1L); + bookiePendingMap.put(addr2.toBookieId(), 7L); + bookiePendingMap.put(addr3.toBookieId(), 1L); + bookiePendingMap.put(addr4.toBookieId(), 5L); + DistributionSchedule.WriteSet origWriteSet = testWriteSet.copy(); + DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence( + testEnsemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), testWriteSet); + LOG.info("[hangc] reorder set: {}, orig set: {}", reorderSet, origWriteSet); + } + @Test public void testNewEnsembleWithPickDifferentRack() throws Exception { BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181); From b68b03f16b2df29f73ad9295395242b25276a81f Mon Sep 17 00:00:00 2001 From: chenhang Date: Tue, 29 Aug 2023 00:14:45 +0800 Subject: [PATCH 2/3] Add entry read support local node rack awareness --- .../client/RackawareEnsemblePlacementPolicyImpl.java | 12 ++++-------- .../client/TestRackawareEnsemblePlacementPolicy.java | 9 +++++---- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java index dc5330e2df7..6812b0fdfb2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java @@ -814,17 +814,13 @@ public DistributionSchedule.WriteSet reorderReadSequence( BookiesHealthInfo bookiesHealthInfo, DistributionSchedule.WriteSet writeSet) { Map writeSetWithRegion = new HashMap<>(); - /* for (int i = 0; i < writeSet.size(); i++) { - writeSetWithRegion.put(writeSet.get(i), ""); + int idx = writeSet.get(i); + writeSetWithRegion.put(idx, resolveNetworkLocation(ensemble.get(idx))); } - */ - writeSetWithRegion.put(writeSet.get(0), "/default-region/r1"); - writeSetWithRegion.put(writeSet.get(1), "/default-region/r1"); - writeSetWithRegion.put(writeSet.get(2), "/default-region/r2"); - writeSetWithRegion.put(writeSet.get(3), "/default-region/r3"); return reorderReadSequenceWithRegion( - ensemble, writeSet, writeSetWithRegion, bookiesHealthInfo, false, "", writeSet.size()); + ensemble, writeSet, writeSetWithRegion, bookiesHealthInfo, true, + localNode.getNetworkLocation(), writeSet.size()); } /** diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java index 656989e9515..c4748f0eed6 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java @@ -23,6 +23,7 @@ import static org.apache.bookkeeper.feature.SettableFeatureProvider.DISABLE_ALL; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertThat; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -223,7 +224,7 @@ public void testNodeReadOnly() throws Exception { DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence( ensemble, getBookiesHealthInfo(), writeSet); LOG.info("reorder set : {}", reorderSet); - assertEquals(reorderSet, origWriteSet); + assertNotEquals(reorderSet, origWriteSet); } @Test @@ -1655,10 +1656,9 @@ public void testReadRequestReorder() throws Exception { StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/default-region/r3"); repp.uninitalize(); - updateMyRack("/default-region/r3"); + updateMyRack("/default-region/r2"); repp = new RackawareEnsemblePlacementPolicy(); - //repp = new RegionAwareEnsemblePlacementPolicy(); ClientConfiguration conf = (ClientConfiguration) this.conf.clone(); conf.setReorderReadSequenceEnabled(true); repp.initialize(conf, Optional.empty(), timer, @@ -1688,7 +1688,8 @@ public void testReadRequestReorder() throws Exception { DistributionSchedule.WriteSet origWriteSet = testWriteSet.copy(); DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence( testEnsemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), testWriteSet); - LOG.info("[hangc] reorder set: {}, orig set: {}", reorderSet, origWriteSet); + assertNotEquals(reorderSet, origWriteSet); + assertEquals(reorderSet.get(0), origWriteSet.get(2)); } @Test From 76cbd942d3b5f6b8c4d7b9e6b142e8e1390ab9ae Mon Sep 17 00:00:00 2001 From: chenhang Date: Tue, 29 Aug 2023 18:57:53 +0800 Subject: [PATCH 3/3] Fix one bug and add more tests --- .../RackawareEnsemblePlacementPolicyImpl.java | 19 +- .../TestRackawareEnsemblePlacementPolicy.java | 171 +++++++++++++++++- ...estRegionAwareEnsemblePlacementPolicy.java | 94 ++++++++++ 3 files changed, 273 insertions(+), 11 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java index 6812b0fdfb2..dc6fe64b3c5 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java @@ -104,11 +104,13 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP static final int SLOW_MASK = 0x20 << 24; static final int UNAVAIL_MASK = 0x40 << 24; static final int MASK_BITS = 0xFFF << 20; + static final String UNKNOWN_RACK = "UnknownRack"; protected HashedWheelTimer timer; // Use a loading cache so slow bookies are expired. Use entryId as values. protected Cache slowBookies; protected BookieNode localNode; + protected String myRack; protected boolean reorderReadsRandom = false; protected boolean enforceDurability = false; protected int stabilizePeriodSeconds = 0; @@ -217,7 +219,7 @@ public Integer getSample() { LOG.info("Initialize rackaware ensemble placement policy @ {} @ {} : {}.", localNode, null == localNode ? "Unknown" : localNode.getNetworkLocation(), dnsResolver.getClass().getName()); - + myRack = getLocalRack(localNode); this.isWeighted = isWeighted; if (this.isWeighted) { this.maxWeightMultiple = maxWeightMultiple; @@ -820,7 +822,7 @@ public DistributionSchedule.WriteSet reorderReadSequence( } return reorderReadSequenceWithRegion( ensemble, writeSet, writeSetWithRegion, bookiesHealthInfo, true, - localNode.getNetworkLocation(), writeSet.size()); + myRack, writeSet.size()); } /** @@ -860,7 +862,7 @@ DistributionSchedule.WriteSet reorderReadSequenceWithRegion( boolean regionAware, String myRegion, int remoteNodeInReorderSequence) { - boolean useRegionAware = regionAware && (!myRegion.equals(UNKNOWN_REGION)); + boolean useRegionAware = regionAware && !UNKNOWN_REGION.equals(myRegion) && !UNKNOWN_RACK.equals(myRegion); int ensembleSize = ensemble.size(); // For rack aware, If all the bookies in the write set are available, simply return the original write set, @@ -952,7 +954,7 @@ DistributionSchedule.WriteSet reorderReadSequenceWithRegion( long slowIdx = numPendingReqs * ensembleSize + idx; writeSet.set(i, (int) (slowIdx & ~MASK_BITS) | SLOW_MASK); } else { - if (useRegionAware && !myRegion.equals(region)) { + if (useRegionAware && !region.equals(myRegion)) { writeSet.set(i, idx | REMOTE_MASK); } else { writeSet.set(i, idx | LOCAL_MASK); @@ -961,7 +963,7 @@ DistributionSchedule.WriteSet reorderReadSequenceWithRegion( } else { // use bookies with earlier failed entryIds first long failIdx = lastFailedEntryOnBookie * ensembleSize + idx; - if (useRegionAware && !myRegion.equals(region)) { + if (useRegionAware && !region.equals(myRegion)) { writeSet.set(i, (int) (failIdx & ~MASK_BITS) | REMOTE_FAIL_MASK); } else { writeSet.set(i, (int) (failIdx & ~MASK_BITS) | LOCAL_FAIL_MASK); @@ -1320,4 +1322,11 @@ private BookieNode replaceToAdherePlacementPolicyInternal( } throw new BKNotEnoughBookiesException(); } + + protected String getLocalRack(BookieNode node) { + if (null == node || null == node.getAddr()) { + return UNKNOWN_RACK; + } + return node.getNetworkLocation(); + } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java index c4748f0eed6..dad50a8ccdf 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java @@ -1644,7 +1644,7 @@ public void testNewEnsembleWithMultipleRacksWithCommonRackFailed() throws Except } @Test - public void testReadRequestReorder() throws Exception { + public void testReadRequestReorderWithLocalNodeAware() throws Exception { BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181); BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.3", 3181); BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.4", 3181); @@ -1678,18 +1678,177 @@ public void testReadRequestReorder() throws Exception { testEnsemble.add(addr2.toBookieId()); testEnsemble.add(addr3.toBookieId()); testEnsemble.add(addr4.toBookieId()); - DistributionSchedule.WriteSet testWriteSet = writeSetFromValues(0, 1, 2, 3); + DistributionSchedule.WriteSet testWriteSet = writeSetFromValues(0, 1, 2, 3, 0, 1, 2); + + DistributionSchedule.WriteSet origWriteSet = testWriteSet.copy(); + DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence( + testEnsemble, getBookiesHealthInfo(), testWriteSet); + assertNotEquals(reorderSet, origWriteSet); + assertEquals(reorderSet.get(0), origWriteSet.get(2)); + } + + @Test + public void testReadRequestReorderWithLocalNodeIgnore() throws Exception { + BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181); + BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.3", 3181); + BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.4", 3181); + BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.5", 3181); + // update dns mapping + StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/default-region/r2"); + StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/default-region/r3"); + + repp.uninitalize(); + + repp = new RackawareEnsemblePlacementPolicy(); + ClientConfiguration conf = (ClientConfiguration) this.conf.clone(); + conf.setReorderReadSequenceEnabled(true); + conf.setIgnoreLocalNodeInPlacementPolicy(true); + repp.initialize(conf, Optional.empty(), timer, + DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); + repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK); + + //update cluster + Set addrs = new HashSet(); + addrs.add(addr1.toBookieId()); + addrs.add(addr2.toBookieId()); + addrs.add(addr3.toBookieId()); + addrs.add(addr4.toBookieId()); + repp.onClusterChanged(addrs, new HashSet()); + + List testEnsemble = new ArrayList<>(); + testEnsemble.add(addr1.toBookieId()); + testEnsemble.add(addr2.toBookieId()); + testEnsemble.add(addr3.toBookieId()); + testEnsemble.add(addr4.toBookieId()); + DistributionSchedule.WriteSet testWriteSet = writeSetFromValues(0, 1, 2, 3, 0, 1, 2); + + DistributionSchedule.WriteSet origWriteSet = testWriteSet.copy(); + DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence( + testEnsemble, getBookiesHealthInfo(), testWriteSet); + assertTrue(reorderSet.equals(origWriteSet)); + } + + @Test + public void testReadRequestReorderWithLocalAndHealthInfo() throws Exception { + BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181); + BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.3", 3181); + BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.4", 3181); + BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.5", 3181); + BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.6", 3181); + // update dns mapping + StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/default-region/r2"); + StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/default-region/r3"); + StaticDNSResolver.addNodeToRack(addr5.getHostName(), "/default-region/r2"); + + repp.uninitalize(); + updateMyRack("/default-region/r2"); + + repp = new RackawareEnsemblePlacementPolicy(); + ClientConfiguration conf = (ClientConfiguration) this.conf.clone(); + conf.setReorderReadSequenceEnabled(true); + conf.setReorderThresholdPendingRequests(10); + repp.initialize(conf, Optional.empty(), timer, + DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); + repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK); + + //update cluster + Set addrs = new HashSet(); + addrs.add(addr1.toBookieId()); + addrs.add(addr2.toBookieId()); + addrs.add(addr3.toBookieId()); + addrs.add(addr4.toBookieId()); + addrs.add(addr5.toBookieId()); + repp.onClusterChanged(addrs, new HashSet()); + + List testEnsemble = new ArrayList<>(); + testEnsemble.add(addr1.toBookieId()); + testEnsemble.add(addr2.toBookieId()); + testEnsemble.add(addr3.toBookieId()); + testEnsemble.add(addr4.toBookieId()); + testEnsemble.add(addr5.toBookieId()); + DistributionSchedule.WriteSet testWriteSet = writeSetFromValues(0, 1, 2, 3, 4, 0, 1); Map bookiePendingMap = new HashMap<>(); - bookiePendingMap.put(addr1.toBookieId(), 1L); - bookiePendingMap.put(addr2.toBookieId(), 7L); - bookiePendingMap.put(addr3.toBookieId(), 1L); - bookiePendingMap.put(addr4.toBookieId(), 5L); + bookiePendingMap.put(addr1.toBookieId(), 120L); + bookiePendingMap.put(addr2.toBookieId(), 70L); + bookiePendingMap.put(addr3.toBookieId(), 50L); + bookiePendingMap.put(addr4.toBookieId(), 209L); + bookiePendingMap.put(addr5.toBookieId(), 15L); + DistributionSchedule.WriteSet origWriteSet = testWriteSet.copy(); DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence( testEnsemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), testWriteSet); assertNotEquals(reorderSet, origWriteSet); + assertEquals(reorderSet.get(0), origWriteSet.get(4)); + assertEquals(reorderSet.get(1), origWriteSet.get(2)); + } + + @Test + public void testReadRequestReorderWithLocalAndHealthInfoV2() throws Exception { + BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181); + BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.3", 3181); + BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.4", 3181); + BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.5", 3181); + BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.6", 3181); + // update dns mapping + StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/default-region/r2"); + StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/default-region/r3"); + StaticDNSResolver.addNodeToRack(addr5.getHostName(), "/default-region/r2"); + + repp.uninitalize(); + updateMyRack("/default-region/r2"); + + repp = new RackawareEnsemblePlacementPolicy(); + ClientConfiguration conf = (ClientConfiguration) this.conf.clone(); + conf.setReorderReadSequenceEnabled(true); + conf.setReorderThresholdPendingRequests(10); + repp.initialize(conf, Optional.empty(), timer, + DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); + repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK); + + //update cluster + Set addrs = new HashSet(); + addrs.add(addr1.toBookieId()); + addrs.add(addr2.toBookieId()); + addrs.add(addr3.toBookieId()); + addrs.add(addr4.toBookieId()); + addrs.add(addr5.toBookieId()); + repp.onClusterChanged(addrs, new HashSet()); + + List testEnsemble = new ArrayList<>(); + testEnsemble.add(addr1.toBookieId()); + testEnsemble.add(addr2.toBookieId()); + testEnsemble.add(addr3.toBookieId()); + testEnsemble.add(addr4.toBookieId()); + testEnsemble.add(addr5.toBookieId()); + DistributionSchedule.WriteSet testWriteSet = writeSetFromValues(0, 1, 2, 3, 4, 0, 1); + + Map bookiePendingMap = new HashMap<>(); + bookiePendingMap.put(addr1.toBookieId(), 120L); + bookiePendingMap.put(addr2.toBookieId(), 70L); + bookiePendingMap.put(addr3.toBookieId(), 100L); + bookiePendingMap.put(addr4.toBookieId(), 209L); + bookiePendingMap.put(addr5.toBookieId(), 90L); + + Map bookieFailureHistory = new HashMap<>(); + bookieFailureHistory.put(addr1.toBookieId(), 1L); + bookieFailureHistory.put(addr2.toBookieId(), 0L); + bookieFailureHistory.put(addr3.toBookieId(), 20L); + bookieFailureHistory.put(addr4.toBookieId(), 12L); + bookieFailureHistory.put(addr5.toBookieId(), 15L); + + DistributionSchedule.WriteSet origWriteSet = testWriteSet.copy(); + DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence( + testEnsemble, getBookiesHealthInfo(bookieFailureHistory, bookiePendingMap), testWriteSet); + assertNotEquals(reorderSet, origWriteSet); assertEquals(reorderSet.get(0), origWriteSet.get(2)); + assertEquals(reorderSet.get(1), origWriteSet.get(4)); } @Test diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java index ba9c4f862ff..b53bdce9cbd 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java @@ -1880,4 +1880,98 @@ public void testNotifyRackChangeWithNewRegion() throws Exception { assertEquals("region2", repp.address2Region.get(addr3.toBookieId())); assertEquals("region3", repp.address2Region.get(addr4.toBookieId())); } + + @Test + public void testReadRequestReorderWithLocalNodeIgnore() throws Exception { + BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181); + BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.3", 3181); + BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.4", 3181); + BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.5", 3181); + // update dns mapping + StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/default-region/r2"); + StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/default-region/r3"); + + repp.uninitalize(); + + repp = new RegionAwareEnsemblePlacementPolicy(); + ClientConfiguration conf = (ClientConfiguration) this.conf.clone(); + conf.setReorderReadSequenceEnabled(true); + conf.setIgnoreLocalNodeInPlacementPolicy(true); + repp.initialize(conf, Optional.empty(), timer, + DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); + repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK); + + //update cluster + Set addrs = new HashSet(); + addrs.add(addr1.toBookieId()); + addrs.add(addr2.toBookieId()); + addrs.add(addr3.toBookieId()); + addrs.add(addr4.toBookieId()); + repp.onClusterChanged(addrs, new HashSet()); + + List testEnsemble = new ArrayList<>(); + testEnsemble.add(addr1.toBookieId()); + testEnsemble.add(addr2.toBookieId()); + testEnsemble.add(addr3.toBookieId()); + testEnsemble.add(addr4.toBookieId()); + DistributionSchedule.WriteSet testWriteSet = writeSetFromValues(0, 1, 2, 3, 0, 1, 2); + + DistributionSchedule.WriteSet origWriteSet = testWriteSet.copy(); + DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence( + testEnsemble, getBookiesHealthInfo(), testWriteSet); + assertTrue(reorderSet.equals(origWriteSet)); + DistributionSchedule.WriteSet reorderSet1 = repp.reorderReadLACSequence( + testEnsemble, getBookiesHealthInfo(), testWriteSet); + assertTrue(reorderSet1.equals(origWriteSet)); + } + + @Test + public void testReadRequestReorderWithLocalNodeAware() throws Exception { + BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181); + BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.3", 3181); + BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.4", 3181); + BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.5", 3181); + // update dns mapping + StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/default-region/r2"); + StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/default-region/r3"); + + repp.uninitalize(); + updateMyRack("/default-region/r2"); + + repp = new RegionAwareEnsemblePlacementPolicy(); + ClientConfiguration conf = (ClientConfiguration) this.conf.clone(); + conf.setReorderReadSequenceEnabled(true); + repp.initialize(conf, Optional.empty(), timer, + DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); + repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK); + + //update cluster + Set addrs = new HashSet(); + addrs.add(addr1.toBookieId()); + addrs.add(addr2.toBookieId()); + addrs.add(addr3.toBookieId()); + addrs.add(addr4.toBookieId()); + repp.onClusterChanged(addrs, new HashSet()); + + List testEnsemble = new ArrayList<>(); + testEnsemble.add(addr1.toBookieId()); + testEnsemble.add(addr2.toBookieId()); + testEnsemble.add(addr3.toBookieId()); + testEnsemble.add(addr4.toBookieId()); + DistributionSchedule.WriteSet testWriteSet = writeSetFromValues(0, 1, 2, 3, 0, 1, 2); + + DistributionSchedule.WriteSet origWriteSet = testWriteSet.copy(); + DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence( + testEnsemble, getBookiesHealthInfo(), testWriteSet); + assertTrue(reorderSet.equals(origWriteSet)); + + DistributionSchedule.WriteSet reorderSet1 = repp.reorderReadLACSequence( + testEnsemble, getBookiesHealthInfo(), testWriteSet); + assertTrue(reorderSet1.equals(origWriteSet)); + } + }