diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java index 6ec9e5b1589..dc6fe64b3c5 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java @@ -104,11 +104,13 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP static final int SLOW_MASK = 0x20 << 24; static final int UNAVAIL_MASK = 0x40 << 24; static final int MASK_BITS = 0xFFF << 20; + static final String UNKNOWN_RACK = "UnknownRack"; protected HashedWheelTimer timer; // Use a loading cache so slow bookies are expired. Use entryId as values. protected Cache slowBookies; protected BookieNode localNode; + protected String myRack; protected boolean reorderReadsRandom = false; protected boolean enforceDurability = false; protected int stabilizePeriodSeconds = 0; @@ -217,7 +219,7 @@ public Integer getSample() { LOG.info("Initialize rackaware ensemble placement policy @ {} @ {} : {}.", localNode, null == localNode ? "Unknown" : localNode.getNetworkLocation(), dnsResolver.getClass().getName()); - + myRack = getLocalRack(localNode); this.isWeighted = isWeighted; if (this.isWeighted) { this.maxWeightMultiple = maxWeightMultiple; @@ -815,10 +817,12 @@ public DistributionSchedule.WriteSet reorderReadSequence( DistributionSchedule.WriteSet writeSet) { Map writeSetWithRegion = new HashMap<>(); for (int i = 0; i < writeSet.size(); i++) { - writeSetWithRegion.put(writeSet.get(i), ""); + int idx = writeSet.get(i); + writeSetWithRegion.put(idx, resolveNetworkLocation(ensemble.get(idx))); } return reorderReadSequenceWithRegion( - ensemble, writeSet, writeSetWithRegion, bookiesHealthInfo, false, "", writeSet.size()); + ensemble, writeSet, writeSetWithRegion, bookiesHealthInfo, true, + myRack, writeSet.size()); } /** @@ -858,7 +862,7 @@ DistributionSchedule.WriteSet reorderReadSequenceWithRegion( boolean regionAware, String myRegion, int remoteNodeInReorderSequence) { - boolean useRegionAware = regionAware && (!myRegion.equals(UNKNOWN_REGION)); + boolean useRegionAware = regionAware && !UNKNOWN_REGION.equals(myRegion) && !UNKNOWN_RACK.equals(myRegion); int ensembleSize = ensemble.size(); // For rack aware, If all the bookies in the write set are available, simply return the original write set, @@ -950,7 +954,7 @@ DistributionSchedule.WriteSet reorderReadSequenceWithRegion( long slowIdx = numPendingReqs * ensembleSize + idx; writeSet.set(i, (int) (slowIdx & ~MASK_BITS) | SLOW_MASK); } else { - if (useRegionAware && !myRegion.equals(region)) { + if (useRegionAware && !region.equals(myRegion)) { writeSet.set(i, idx | REMOTE_MASK); } else { writeSet.set(i, idx | LOCAL_MASK); @@ -959,7 +963,7 @@ DistributionSchedule.WriteSet reorderReadSequenceWithRegion( } else { // use bookies with earlier failed entryIds first long failIdx = lastFailedEntryOnBookie * ensembleSize + idx; - if (useRegionAware && !myRegion.equals(region)) { + if (useRegionAware && !region.equals(myRegion)) { writeSet.set(i, (int) (failIdx & ~MASK_BITS) | REMOTE_FAIL_MASK); } else { writeSet.set(i, (int) (failIdx & ~MASK_BITS) | LOCAL_FAIL_MASK); @@ -1318,4 +1322,11 @@ private BookieNode replaceToAdherePlacementPolicyInternal( } throw new BKNotEnoughBookiesException(); } + + protected String getLocalRack(BookieNode node) { + if (null == node || null == node.getAddr()) { + return UNKNOWN_RACK; + } + return node.getNetworkLocation(); + } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java index 95a7d5b40d7..dad50a8ccdf 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java @@ -23,6 +23,7 @@ import static org.apache.bookkeeper.feature.SettableFeatureProvider.DISABLE_ALL; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertThat; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -223,7 +224,7 @@ public void testNodeReadOnly() throws Exception { DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence( ensemble, getBookiesHealthInfo(), writeSet); LOG.info("reorder set : {}", reorderSet); - assertEquals(reorderSet, origWriteSet); + assertNotEquals(reorderSet, origWriteSet); } @Test @@ -1642,6 +1643,214 @@ public void testNewEnsembleWithMultipleRacksWithCommonRackFailed() throws Except } } + @Test + public void testReadRequestReorderWithLocalNodeAware() throws Exception { + BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181); + BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.3", 3181); + BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.4", 3181); + BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.5", 3181); + // update dns mapping + StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/default-region/r2"); + StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/default-region/r3"); + + repp.uninitalize(); + updateMyRack("/default-region/r2"); + + repp = new RackawareEnsemblePlacementPolicy(); + ClientConfiguration conf = (ClientConfiguration) this.conf.clone(); + conf.setReorderReadSequenceEnabled(true); + repp.initialize(conf, Optional.empty(), timer, + DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); + repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK); + + //update cluster + Set addrs = new HashSet(); + addrs.add(addr1.toBookieId()); + addrs.add(addr2.toBookieId()); + addrs.add(addr3.toBookieId()); + addrs.add(addr4.toBookieId()); + repp.onClusterChanged(addrs, new HashSet()); + + List testEnsemble = new ArrayList<>(); + testEnsemble.add(addr1.toBookieId()); + testEnsemble.add(addr2.toBookieId()); + testEnsemble.add(addr3.toBookieId()); + testEnsemble.add(addr4.toBookieId()); + DistributionSchedule.WriteSet testWriteSet = writeSetFromValues(0, 1, 2, 3, 0, 1, 2); + + DistributionSchedule.WriteSet origWriteSet = testWriteSet.copy(); + DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence( + testEnsemble, getBookiesHealthInfo(), testWriteSet); + assertNotEquals(reorderSet, origWriteSet); + assertEquals(reorderSet.get(0), origWriteSet.get(2)); + } + + @Test + public void testReadRequestReorderWithLocalNodeIgnore() throws Exception { + BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181); + BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.3", 3181); + BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.4", 3181); + BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.5", 3181); + // update dns mapping + StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/default-region/r2"); + StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/default-region/r3"); + + repp.uninitalize(); + + repp = new RackawareEnsemblePlacementPolicy(); + ClientConfiguration conf = (ClientConfiguration) this.conf.clone(); + conf.setReorderReadSequenceEnabled(true); + conf.setIgnoreLocalNodeInPlacementPolicy(true); + repp.initialize(conf, Optional.empty(), timer, + DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); + repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK); + + //update cluster + Set addrs = new HashSet(); + addrs.add(addr1.toBookieId()); + addrs.add(addr2.toBookieId()); + addrs.add(addr3.toBookieId()); + addrs.add(addr4.toBookieId()); + repp.onClusterChanged(addrs, new HashSet()); + + List testEnsemble = new ArrayList<>(); + testEnsemble.add(addr1.toBookieId()); + testEnsemble.add(addr2.toBookieId()); + testEnsemble.add(addr3.toBookieId()); + testEnsemble.add(addr4.toBookieId()); + DistributionSchedule.WriteSet testWriteSet = writeSetFromValues(0, 1, 2, 3, 0, 1, 2); + + DistributionSchedule.WriteSet origWriteSet = testWriteSet.copy(); + DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence( + testEnsemble, getBookiesHealthInfo(), testWriteSet); + assertTrue(reorderSet.equals(origWriteSet)); + } + + @Test + public void testReadRequestReorderWithLocalAndHealthInfo() throws Exception { + BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181); + BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.3", 3181); + BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.4", 3181); + BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.5", 3181); + BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.6", 3181); + // update dns mapping + StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/default-region/r2"); + StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/default-region/r3"); + StaticDNSResolver.addNodeToRack(addr5.getHostName(), "/default-region/r2"); + + repp.uninitalize(); + updateMyRack("/default-region/r2"); + + repp = new RackawareEnsemblePlacementPolicy(); + ClientConfiguration conf = (ClientConfiguration) this.conf.clone(); + conf.setReorderReadSequenceEnabled(true); + conf.setReorderThresholdPendingRequests(10); + repp.initialize(conf, Optional.empty(), timer, + DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); + repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK); + + //update cluster + Set addrs = new HashSet(); + addrs.add(addr1.toBookieId()); + addrs.add(addr2.toBookieId()); + addrs.add(addr3.toBookieId()); + addrs.add(addr4.toBookieId()); + addrs.add(addr5.toBookieId()); + repp.onClusterChanged(addrs, new HashSet()); + + List testEnsemble = new ArrayList<>(); + testEnsemble.add(addr1.toBookieId()); + testEnsemble.add(addr2.toBookieId()); + testEnsemble.add(addr3.toBookieId()); + testEnsemble.add(addr4.toBookieId()); + testEnsemble.add(addr5.toBookieId()); + DistributionSchedule.WriteSet testWriteSet = writeSetFromValues(0, 1, 2, 3, 4, 0, 1); + + Map bookiePendingMap = new HashMap<>(); + bookiePendingMap.put(addr1.toBookieId(), 120L); + bookiePendingMap.put(addr2.toBookieId(), 70L); + bookiePendingMap.put(addr3.toBookieId(), 50L); + bookiePendingMap.put(addr4.toBookieId(), 209L); + bookiePendingMap.put(addr5.toBookieId(), 15L); + + DistributionSchedule.WriteSet origWriteSet = testWriteSet.copy(); + DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence( + testEnsemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), testWriteSet); + assertNotEquals(reorderSet, origWriteSet); + assertEquals(reorderSet.get(0), origWriteSet.get(4)); + assertEquals(reorderSet.get(1), origWriteSet.get(2)); + } + + @Test + public void testReadRequestReorderWithLocalAndHealthInfoV2() throws Exception { + BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181); + BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.3", 3181); + BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.4", 3181); + BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.5", 3181); + BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.6", 3181); + // update dns mapping + StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/default-region/r2"); + StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/default-region/r3"); + StaticDNSResolver.addNodeToRack(addr5.getHostName(), "/default-region/r2"); + + repp.uninitalize(); + updateMyRack("/default-region/r2"); + + repp = new RackawareEnsemblePlacementPolicy(); + ClientConfiguration conf = (ClientConfiguration) this.conf.clone(); + conf.setReorderReadSequenceEnabled(true); + conf.setReorderThresholdPendingRequests(10); + repp.initialize(conf, Optional.empty(), timer, + DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); + repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK); + + //update cluster + Set addrs = new HashSet(); + addrs.add(addr1.toBookieId()); + addrs.add(addr2.toBookieId()); + addrs.add(addr3.toBookieId()); + addrs.add(addr4.toBookieId()); + addrs.add(addr5.toBookieId()); + repp.onClusterChanged(addrs, new HashSet()); + + List testEnsemble = new ArrayList<>(); + testEnsemble.add(addr1.toBookieId()); + testEnsemble.add(addr2.toBookieId()); + testEnsemble.add(addr3.toBookieId()); + testEnsemble.add(addr4.toBookieId()); + testEnsemble.add(addr5.toBookieId()); + DistributionSchedule.WriteSet testWriteSet = writeSetFromValues(0, 1, 2, 3, 4, 0, 1); + + Map bookiePendingMap = new HashMap<>(); + bookiePendingMap.put(addr1.toBookieId(), 120L); + bookiePendingMap.put(addr2.toBookieId(), 70L); + bookiePendingMap.put(addr3.toBookieId(), 100L); + bookiePendingMap.put(addr4.toBookieId(), 209L); + bookiePendingMap.put(addr5.toBookieId(), 90L); + + Map bookieFailureHistory = new HashMap<>(); + bookieFailureHistory.put(addr1.toBookieId(), 1L); + bookieFailureHistory.put(addr2.toBookieId(), 0L); + bookieFailureHistory.put(addr3.toBookieId(), 20L); + bookieFailureHistory.put(addr4.toBookieId(), 12L); + bookieFailureHistory.put(addr5.toBookieId(), 15L); + + DistributionSchedule.WriteSet origWriteSet = testWriteSet.copy(); + DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence( + testEnsemble, getBookiesHealthInfo(bookieFailureHistory, bookiePendingMap), testWriteSet); + assertNotEquals(reorderSet, origWriteSet); + assertEquals(reorderSet.get(0), origWriteSet.get(2)); + assertEquals(reorderSet.get(1), origWriteSet.get(4)); + } + @Test public void testNewEnsembleWithPickDifferentRack() throws Exception { BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java index ba9c4f862ff..b53bdce9cbd 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java @@ -1880,4 +1880,98 @@ public void testNotifyRackChangeWithNewRegion() throws Exception { assertEquals("region2", repp.address2Region.get(addr3.toBookieId())); assertEquals("region3", repp.address2Region.get(addr4.toBookieId())); } + + @Test + public void testReadRequestReorderWithLocalNodeIgnore() throws Exception { + BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181); + BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.3", 3181); + BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.4", 3181); + BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.5", 3181); + // update dns mapping + StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/default-region/r2"); + StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/default-region/r3"); + + repp.uninitalize(); + + repp = new RegionAwareEnsemblePlacementPolicy(); + ClientConfiguration conf = (ClientConfiguration) this.conf.clone(); + conf.setReorderReadSequenceEnabled(true); + conf.setIgnoreLocalNodeInPlacementPolicy(true); + repp.initialize(conf, Optional.empty(), timer, + DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); + repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK); + + //update cluster + Set addrs = new HashSet(); + addrs.add(addr1.toBookieId()); + addrs.add(addr2.toBookieId()); + addrs.add(addr3.toBookieId()); + addrs.add(addr4.toBookieId()); + repp.onClusterChanged(addrs, new HashSet()); + + List testEnsemble = new ArrayList<>(); + testEnsemble.add(addr1.toBookieId()); + testEnsemble.add(addr2.toBookieId()); + testEnsemble.add(addr3.toBookieId()); + testEnsemble.add(addr4.toBookieId()); + DistributionSchedule.WriteSet testWriteSet = writeSetFromValues(0, 1, 2, 3, 0, 1, 2); + + DistributionSchedule.WriteSet origWriteSet = testWriteSet.copy(); + DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence( + testEnsemble, getBookiesHealthInfo(), testWriteSet); + assertTrue(reorderSet.equals(origWriteSet)); + DistributionSchedule.WriteSet reorderSet1 = repp.reorderReadLACSequence( + testEnsemble, getBookiesHealthInfo(), testWriteSet); + assertTrue(reorderSet1.equals(origWriteSet)); + } + + @Test + public void testReadRequestReorderWithLocalNodeAware() throws Exception { + BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181); + BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.3", 3181); + BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.4", 3181); + BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.5", 3181); + // update dns mapping + StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/default-region/r2"); + StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/default-region/r3"); + + repp.uninitalize(); + updateMyRack("/default-region/r2"); + + repp = new RegionAwareEnsemblePlacementPolicy(); + ClientConfiguration conf = (ClientConfiguration) this.conf.clone(); + conf.setReorderReadSequenceEnabled(true); + repp.initialize(conf, Optional.empty(), timer, + DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); + repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK); + + //update cluster + Set addrs = new HashSet(); + addrs.add(addr1.toBookieId()); + addrs.add(addr2.toBookieId()); + addrs.add(addr3.toBookieId()); + addrs.add(addr4.toBookieId()); + repp.onClusterChanged(addrs, new HashSet()); + + List testEnsemble = new ArrayList<>(); + testEnsemble.add(addr1.toBookieId()); + testEnsemble.add(addr2.toBookieId()); + testEnsemble.add(addr3.toBookieId()); + testEnsemble.add(addr4.toBookieId()); + DistributionSchedule.WriteSet testWriteSet = writeSetFromValues(0, 1, 2, 3, 0, 1, 2); + + DistributionSchedule.WriteSet origWriteSet = testWriteSet.copy(); + DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence( + testEnsemble, getBookiesHealthInfo(), testWriteSet); + assertTrue(reorderSet.equals(origWriteSet)); + + DistributionSchedule.WriteSet reorderSet1 = repp.reorderReadLACSequence( + testEnsemble, getBookiesHealthInfo(), testWriteSet); + assertTrue(reorderSet1.equals(origWriteSet)); + } + }