diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java index f9ac460bdef..a3a217528d9 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java @@ -24,6 +24,7 @@ import static org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithMetadataBookieDriver; import static org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithRegistrationManager; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -1061,14 +1062,15 @@ private Map getReplacementBookies( } } return getReplacementBookiesByIndexes( - lh, ensemble, bookieIndexesToRereplicate, Optional.of(bookiesToRereplicate)); + lh, ensemble, bookieIndexesToRereplicate, Optional.of(bookiesToRereplicate), false); } - private Map getReplacementBookiesByIndexes( + @VisibleForTesting + Map getReplacementBookiesByIndexes( LedgerHandle lh, List ensemble, Set bookieIndexesToRereplicate, - Optional> excludedBookies) + Optional> excludedBookies, boolean downgradeToSelf) throws BKException.BKNotEnoughBookiesException { // target bookies to replicate Map targetBookieAddresses = @@ -1078,35 +1080,58 @@ private Map getReplacementBookiesByIndexes( if (excludedBookies.isPresent()) { bookiesToExclude.addAll(excludedBookies.get()); } + //We want to prioritize replacing offline nodes, so we put the indices of offline nodes at the beginning. + List orderedBookieIndexesToRereplicate = new LinkedList<>(); + for (Integer index : bookieIndexesToRereplicate) { + boolean alive = ((TopologyAwareEnsemblePlacementPolicy) bkc.getPlacementPolicy()).isAlive( + ensemble.get(index)); + if (alive) { + orderedBookieIndexesToRereplicate.add(index); + } else { + orderedBookieIndexesToRereplicate.add(0, index); + } + } // excluding bookies that need to be replicated - for (Integer bookieIndex : bookieIndexesToRereplicate) { + for (Integer bookieIndex : orderedBookieIndexesToRereplicate) { BookieId bookie = ensemble.get(bookieIndex); bookiesToExclude.add(bookie); } - + List newEnsemble = new ArrayList<>(ensemble); // allocate bookies - for (Integer bookieIndex : bookieIndexesToRereplicate) { - BookieId oldBookie = ensemble.get(bookieIndex); - EnsemblePlacementPolicy.PlacementResult replaceBookieResponse = - bkc.getPlacementPolicy().replaceBookie( - lh.getLedgerMetadata().getEnsembleSize(), - lh.getLedgerMetadata().getWriteQuorumSize(), - lh.getLedgerMetadata().getAckQuorumSize(), - lh.getLedgerMetadata().getCustomMetadata(), - ensemble, - oldBookie, - bookiesToExclude); - BookieId newBookie = replaceBookieResponse.getResult(); - PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getAdheringToPolicy(); - if (isEnsembleAdheringToPlacementPolicy == PlacementPolicyAdherence.FAIL && LOG.isDebugEnabled()) { - LOG.debug( - "replaceBookie for bookie: {} in ensemble: {} " - + "is not adhering to placement policy and chose {}", - oldBookie, ensemble, newBookie); + for (Integer bookieIndex : orderedBookieIndexesToRereplicate) { + BookieId oldBookie = newEnsemble.get(bookieIndex); + BookieId newBookie; + try { + EnsemblePlacementPolicy.PlacementResult replaceBookieResponse = + bkc.getPlacementPolicy().replaceBookie( + lh.getLedgerMetadata().getEnsembleSize(), + lh.getLedgerMetadata().getWriteQuorumSize(), + lh.getLedgerMetadata().getAckQuorumSize(), + lh.getLedgerMetadata().getCustomMetadata(), + newEnsemble, + oldBookie, + bookiesToExclude); + newBookie = replaceBookieResponse.getResult(); + PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = + replaceBookieResponse.getAdheringToPolicy(); + if (isEnsembleAdheringToPlacementPolicy == PlacementPolicyAdherence.FAIL && LOG.isDebugEnabled()) { + LOG.debug( + "replaceBookie for bookie: {} in ensemble: {} " + + "is not adhering to placement policy and chose {}", + oldBookie, newEnsemble, newBookie); + } + } catch (BKException.BKNotEnoughBookiesException e) { + if (downgradeToSelf && ((TopologyAwareEnsemblePlacementPolicy) bkc.getPlacementPolicy()).isAlive( + oldBookie)) { + newBookie = oldBookie; + } else { + throw e; + } } targetBookieAddresses.put(bookieIndex, newBookie); bookiesToExclude.add(newBookie); + newEnsemble.set(bookieIndex, newBookie); } return targetBookieAddresses; @@ -1136,7 +1161,7 @@ public void replicateLedgerFragment(LedgerHandle lh, final LedgerFragment ledger if (LedgerFragment.ReplicateType.DATA_LOSS == ledgerFragment.getReplicateType()) { Optional> excludedBookies = Optional.empty(); targetBookieAddresses = getReplacementBookiesByIndexes(lh, ledgerFragment.getEnsemble(), - ledgerFragment.getBookiesIndexes(), excludedBookies); + ledgerFragment.getBookiesIndexes(), excludedBookies, true); } else if (LedgerFragment.ReplicateType.DATA_NOT_ADHERING_PLACEMENT == ledgerFragment.getReplicateType()) { targetBookieAddresses = replaceNotAdheringPlacementPolicyBookie(ledgerFragment.getEnsemble(), lh.getLedgerMetadata().getWriteQuorumSize(), lh.getLedgerMetadata().getAckQuorumSize()); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java index 463d9599de2..df0f4b3a542 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java @@ -846,4 +846,8 @@ protected BookieNode convertBookieToNode(BookieId addr) { } return bn; } + + public boolean isAlive(BookieId bookieId) { + return knownBookies.containsKey(bookieId); + } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java index 95a7d5b40d7..37dea7be250 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java @@ -24,9 +24,13 @@ import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.netty.util.HashedWheelTimer; +import java.lang.reflect.Field; import java.net.InetAddress; import java.util.ArrayList; import java.util.Arrays; @@ -47,6 +51,7 @@ import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy.Ensemble; import org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.EnsembleForReplacementWithNoConstraints; import org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.TruePredicate; +import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.common.util.ReflectionUtils; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.net.AbstractDNSToSwitchMapping; @@ -636,6 +641,67 @@ public void testIsEnsembleAdheringToPlacementPolicy() throws Exception { repp.isEnsembleAdheringToPlacementPolicy(ensemble, 3, 3)); } + @Test + public void testReplaceBookieWithNoMoreBookies() throws Exception { + BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181); + BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181); + BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181); + // update dns mapping + StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/default-region/r2"); + StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/default-region/r3"); + // Update cluster + Set addrs = new HashSet(); + addrs.add(addr1.toBookieId()); + addrs.add(addr2.toBookieId()); + addrs.add(addr3.toBookieId()); + + List ensemble = new ArrayList<>(); + ensemble.add(addr1.toBookieId()); + ensemble.add(addr2.toBookieId()); + ensemble.add(addr3.toBookieId()); + + repp.onClusterChanged(addrs, new HashSet()); + + BookKeeper bookKeeper = new BookKeeper(); + BookKeeperAdmin admin = new BookKeeperAdmin(bookKeeper, NullStatsLogger.INSTANCE, + new ClientConfiguration(conf)); + Field field = BookKeeper.class.getDeclaredField("placementPolicy"); + field.setAccessible(true); + field.set(bookKeeper, repp); + LedgerHandle mockHandle = mock(LedgerHandle.class); + + Map> ensembles = new HashMap<>(); + ensembles.put(0L, ensemble); + LedgerMetadataImpl ledgerMetadata = new LedgerMetadataImpl(0, 0, 3, 3, 2, LedgerMetadata.State.CLOSED, + Optional.of(1L), Optional.of(10L), ensembles, Optional.empty(), Optional.empty(), 0L, false, 0L, + Collections.emptyMap()); + when(mockHandle.getLedgerMetadata()).thenReturn(ledgerMetadata); + //Not downgradeToSelf, can't find replaced bookie. + try { + admin.getReplacementBookiesByIndexes(mockHandle, ensemble, Sets.newHashSet(1), Optional.empty(), false); + fail("Should throw BKNotEnoughBookiesException when there is not enough bookies"); + } catch (BKNotEnoughBookiesException ignore) { + } + + //Use downgradeToSelf, find addr2 itself. + Map replaceResult = admin.getReplacementBookiesByIndexes(mockHandle, ensemble, + Sets.newHashSet(1), Optional.empty(), true); + assertFalse(replaceResult.isEmpty()); + assertEquals(1, replaceResult.size()); + assertEquals(addr2.toBookieId(), replaceResult.get(1)); + + //Make addr2 shutdown, then can't find replaced bookie. + addrs.remove(addr2.toBookieId()); + repp.onClusterChanged(addrs, new HashSet()); + + try { + admin.getReplacementBookiesByIndexes(mockHandle, ensemble, Sets.newHashSet(1), Optional.empty(), false); + fail("Should throw BKNotEnoughBookiesException when there is not enough bookies"); + } catch (BKNotEnoughBookiesException ignore) { + } + } + @Test public void testReplaceBookieWithEnoughBookiesInSameRack() throws Exception { BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java index ba9c4f862ff..5e218357cd8 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java @@ -27,12 +27,16 @@ import static org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy.REPP_REGIONS_TO_WRITE; import static org.apache.bookkeeper.client.RoundRobinDistributionSchedule.writeSetFromValues; import static org.apache.bookkeeper.feature.SettableFeatureProvider.DISABLE_ALL; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.netty.util.HashedWheelTimer; +import java.lang.reflect.Field; import java.net.InetAddress; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -42,6 +46,7 @@ import java.util.concurrent.TimeUnit; import junit.framework.TestCase; import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException; +import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.feature.FeatureProvider; import org.apache.bookkeeper.feature.SettableFeature; @@ -418,6 +423,68 @@ public void testNodeDownAndReadOnlyAndNodeSlow() throws Exception { assertEquals(expectedSet, reorderSet); } + @Test + public void testReplaceBookieWithNoMoreBookie() throws Exception { + BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181); + BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181); + BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181); + // update dns mapping + StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/region1/r1"); + StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/region1/r2"); + StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/default-region/r3"); + + // Update cluster + Set addrs = new HashSet(); + addrs.add(addr1.toBookieId()); + addrs.add(addr2.toBookieId()); + addrs.add(addr3.toBookieId()); + + List ensemble = new ArrayList<>(); + ensemble.add(addr1.toBookieId()); + ensemble.add(addr2.toBookieId()); + ensemble.add(addr3.toBookieId()); + + repp.onClusterChanged(addrs, new HashSet()); + + BookKeeper bookKeeper = new BookKeeper(); + BookKeeperAdmin admin = new BookKeeperAdmin(bookKeeper, NullStatsLogger.INSTANCE, + new ClientConfiguration(conf)); + Field field = BookKeeper.class.getDeclaredField("placementPolicy"); + field.setAccessible(true); + field.set(bookKeeper, repp); + LedgerHandle mockHandle = mock(LedgerHandle.class); + Map> ensembles = new HashMap<>(); + ensembles.put(0L, ensemble); + LedgerMetadataImpl ledgerMetadata = new LedgerMetadataImpl(0, 0, 3, 3, 2, LedgerMetadata.State.CLOSED, + Optional.of(1L), Optional.of(10L), ensembles, Optional.empty(), Optional.empty(), 0L, false, 0L, + Collections.emptyMap()); + when(mockHandle.getLedgerMetadata()).thenReturn(ledgerMetadata); + + //Not downgradeToSelf, can't find replaced bookie. + try { + admin.getReplacementBookiesByIndexes(mockHandle, ensemble, Sets.newHashSet(1), Optional.empty(), false); + fail("Should throw BKNotEnoughBookiesException when there is not enough bookies"); + } catch (BKNotEnoughBookiesException ignore) { + } + + //Use downgradeToSelf, find addr2 itself. + Map replaceResult = admin.getReplacementBookiesByIndexes(mockHandle, ensemble, + Sets.newHashSet(1), Optional.empty(), true); + assertFalse(replaceResult.isEmpty()); + assertEquals(1, replaceResult.size()); + assertEquals(addr2.toBookieId(), replaceResult.get(1)); + + //Make addr2 shutdown, then can't find replaced bookie. + addrs.remove(addr2.toBookieId()); + repp.onClusterChanged(addrs, new HashSet()); + + try { + admin.getReplacementBookiesByIndexes(mockHandle, ensemble, Sets.newHashSet(1), Optional.empty(), false); + fail("Should throw BKNotEnoughBookiesException when there is not enough bookies"); + } catch (BKNotEnoughBookiesException ignore) { + } + } + @Test public void testReplaceBookieWithEnoughBookiesInSameRegion() throws Exception { BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestZoneawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestZoneawareEnsemblePlacementPolicy.java index b191225c834..ea0476680d4 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestZoneawareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestZoneawareEnsemblePlacementPolicy.java @@ -21,9 +21,13 @@ import static org.apache.bookkeeper.client.RoundRobinDistributionSchedule.writeSetFromValues; import static org.apache.bookkeeper.feature.SettableFeatureProvider.DISABLE_ALL; import static org.junit.Assert.assertNotEquals; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.netty.util.HashedWheelTimer; +import java.lang.reflect.Field; import java.net.InetAddress; import java.util.ArrayList; import java.util.Collections; @@ -39,6 +43,7 @@ import org.apache.bookkeeper.client.EnsemblePlacementPolicy.PlacementPolicyAdherence; import org.apache.bookkeeper.client.EnsemblePlacementPolicy.PlacementResult; import org.apache.bookkeeper.client.ZoneawareEnsemblePlacementPolicyImpl.ZoneAwareNodeLocation; +import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.net.BookieId; import org.apache.bookkeeper.net.BookieSocketAddress; @@ -857,6 +862,86 @@ public void testReplaceBookieMinUDs() throws Exception { replaceResponse.getAdheringToPolicy()); } + @Test + public void testReplaceBookieWithNoMoreBookie() throws Exception { + zepp.uninitalize(); + updateMyUpgradeDomain(NetworkTopology.DEFAULT_ZONE_AND_UPGRADEDOMAIN); + + // Update cluster + BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.6", 3181); + BookieSocketAddress addr6 = new BookieSocketAddress("127.0.0.6", 3181); + BookieSocketAddress addr7 = new BookieSocketAddress("127.0.0.6", 3181); + + // update dns mapping + StaticDNSResolver.addNodeToRack(addr5.getHostName(), "/zone1/ud1"); + StaticDNSResolver.addNodeToRack(addr6.getHostName(), "/zone1/ud2"); + StaticDNSResolver.addNodeToRack(addr7.getHostName(), "/zone2/ud1"); + + ClientConfiguration newConf = new ClientConfiguration(conf); + newConf.addConfiguration(conf); + newConf.setDiskWeightBasedPlacementEnabled(true); + /* + * since BookieMaxWeightMultipleForWeightBasedPlacement is set to -1, + * there is no max cap on weight. + */ + newConf.setBookieMaxWeightMultipleForWeightBasedPlacement(-1); + newConf.setMinNumZonesPerWriteQuorum(0); + zepp.initialize(newConf, Optional. empty(), timer, DISABLE_ALL, + NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); + zepp.withDefaultFaultDomain(NetworkTopology.DEFAULT_ZONE_AND_UPGRADEDOMAIN); + + // Update cluster + Set addrs = new HashSet(); + addrs.add(addr5.toBookieId()); + addrs.add(addr6.toBookieId()); + addrs.add(addr7.toBookieId()); + + List ensemble = new ArrayList<>(); + ensemble.add(addr5.toBookieId()); + ensemble.add(addr6.toBookieId()); + ensemble.add(addr7.toBookieId()); + + zepp.onClusterChanged(addrs, new HashSet()); + + BookKeeper bookKeeper = new BookKeeper(); + BookKeeperAdmin admin = new BookKeeperAdmin(bookKeeper, NullStatsLogger.INSTANCE, + new ClientConfiguration(conf)); + Field field = BookKeeper.class.getDeclaredField("placementPolicy"); + field.setAccessible(true); + field.set(bookKeeper, zepp); + LedgerHandle mockHandle = mock(LedgerHandle.class); + + Map> ensembles = new HashMap<>(); + ensembles.put(0L, ensemble); + LedgerMetadataImpl ledgerMetadata = new LedgerMetadataImpl(0, 0, 3, 3, 2, LedgerMetadata.State.CLOSED, + Optional.of(1L), Optional.of(10L), ensembles, Optional.empty(), Optional.empty(), 0L, false, 0L, + Collections.emptyMap()); + when(mockHandle.getLedgerMetadata()).thenReturn(ledgerMetadata); + + //Not downgradeToSelf, can't find replaced bookie. + try { + admin.getReplacementBookiesByIndexes(mockHandle, ensemble, Sets.newHashSet(1), Optional.empty(), false); + fail("Should throw BKNotEnoughBookiesException when there is not enough bookies"); + } catch (BKException.BKNotEnoughBookiesException ignore) { + } + + //Use downgradeToSelf, find addr2 itself. + Map replaceResult = admin.getReplacementBookiesByIndexes(mockHandle, ensemble, + Sets.newHashSet(1), Optional.empty(), true); + assertFalse(replaceResult.isEmpty()); + assertEquals(1, replaceResult.size()); + assertEquals(addr6.toBookieId(), replaceResult.get(1)); + + addrs.remove(addr6.toBookieId()); + zepp.onClusterChanged(addrs, new HashSet()); + //Not downgradeToSelf, can't find replaced bookie. + try { + admin.getReplacementBookiesByIndexes(mockHandle, ensemble, Sets.newHashSet(1), Optional.empty(), false); + fail("Should throw BKNotEnoughBookiesException when there is not enough bookies"); + } catch (BKException.BKNotEnoughBookiesException ignore) { + } + } + @Test public void testAreAckedBookiesAdheringToPlacementPolicy() throws Exception { zepp.uninitalize();