From 10929b7a94a6e8b8dd5a5d0cc248f6b5a39703ca Mon Sep 17 00:00:00 2001 From: horizonzy Date: Sat, 1 Jul 2023 18:32:41 +0800 Subject: [PATCH 01/13] Support downgrade to pick replaced bookie self. --- .../LocalBookieEnsemblePlacementPolicy.java | 10 +++- .../bookkeeper/client/BookKeeperAdmin.java | 35 +++++++++----- .../DefaultEnsemblePlacementPolicy.java | 12 ++++- .../client/EnsemblePlacementPolicy.java | 33 ++++++++++++- .../RackawareEnsemblePlacementPolicyImpl.java | 47 ++++++++++++++----- .../TopologyAwareEnsemblePlacementPolicy.java | 4 ++ .../ZoneawareEnsemblePlacementPolicyImpl.java | 46 +++++++++++++++--- 7 files changed, 153 insertions(+), 34 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java index 7c949cb3d4b..860f9ff4ffa 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java @@ -86,9 +86,17 @@ public PlacementResult replaceBookie(int ensembleSize, int writeQuorum java.util.Map customMetadata, List currentEnsemble, BookieId bookieToReplace, Set excludeBookies) throws BKNotEnoughBookiesException { + return this.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, currentEnsemble, + bookieToReplace, excludeBookies, false); + } + + @Override + public PlacementResult replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, + Map customMetadata, List currentEnsemble, BookieId bookieToReplace, + Set excludeBookies, boolean downgradeToSelf) throws BKNotEnoughBookiesException { throw new BKNotEnoughBookiesException(); } - + @Override public void registerSlowBookie(BookieId bookieSocketAddress, long entryId) { return; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java index f9ac460bdef..2c2c1e702f3 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java @@ -1061,14 +1061,15 @@ private Map getReplacementBookies( } } return getReplacementBookiesByIndexes( - lh, ensemble, bookieIndexesToRereplicate, Optional.of(bookiesToRereplicate)); + lh, ensemble, bookieIndexesToRereplicate, Optional.of(bookiesToRereplicate), false); } private Map getReplacementBookiesByIndexes( LedgerHandle lh, List ensemble, Set bookieIndexesToRereplicate, - Optional> excludedBookies) + Optional> excludedBookies, + boolean downgradeToSelf) throws BKException.BKNotEnoughBookiesException { // target bookies to replicate Map targetBookieAddresses = @@ -1078,35 +1079,47 @@ private Map getReplacementBookiesByIndexes( if (excludedBookies.isPresent()) { bookiesToExclude.addAll(excludedBookies.get()); } - + //We want to prioritize replacing offline nodes, so we put the indices of offline nodes at the beginning. + List orderedBookieIndexesToRereplicate = new LinkedList<>(); + for (Integer index : bookieIndexesToRereplicate) { + boolean alive = ((TopologyAwareEnsemblePlacementPolicy) bkc.getPlacementPolicy()).isAlive( + ensemble.get(index)); + if (alive) { + orderedBookieIndexesToRereplicate.add(index); + } else { + orderedBookieIndexesToRereplicate.add(0, index); + } + } + // excluding bookies that need to be replicated - for (Integer bookieIndex : bookieIndexesToRereplicate) { + for (Integer bookieIndex : orderedBookieIndexesToRereplicate) { BookieId bookie = ensemble.get(bookieIndex); bookiesToExclude.add(bookie); } - + List newEnsemble = new ArrayList<>(ensemble) ; // allocate bookies - for (Integer bookieIndex : bookieIndexesToRereplicate) { - BookieId oldBookie = ensemble.get(bookieIndex); + for (Integer bookieIndex : orderedBookieIndexesToRereplicate) { + BookieId oldBookie = newEnsemble.get(bookieIndex); EnsemblePlacementPolicy.PlacementResult replaceBookieResponse = bkc.getPlacementPolicy().replaceBookie( lh.getLedgerMetadata().getEnsembleSize(), lh.getLedgerMetadata().getWriteQuorumSize(), lh.getLedgerMetadata().getAckQuorumSize(), lh.getLedgerMetadata().getCustomMetadata(), - ensemble, + newEnsemble, oldBookie, - bookiesToExclude); + bookiesToExclude, downgradeToSelf); BookieId newBookie = replaceBookieResponse.getResult(); PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getAdheringToPolicy(); if (isEnsembleAdheringToPlacementPolicy == PlacementPolicyAdherence.FAIL && LOG.isDebugEnabled()) { LOG.debug( "replaceBookie for bookie: {} in ensemble: {} " + "is not adhering to placement policy and chose {}", - oldBookie, ensemble, newBookie); + oldBookie, newEnsemble, newBookie); } targetBookieAddresses.put(bookieIndex, newBookie); bookiesToExclude.add(newBookie); + newEnsemble.set(bookieIndex, newBookie); } return targetBookieAddresses; @@ -1136,7 +1149,7 @@ public void replicateLedgerFragment(LedgerHandle lh, final LedgerFragment ledger if (LedgerFragment.ReplicateType.DATA_LOSS == ledgerFragment.getReplicateType()) { Optional> excludedBookies = Optional.empty(); targetBookieAddresses = getReplacementBookiesByIndexes(lh, ledgerFragment.getEnsemble(), - ledgerFragment.getBookiesIndexes(), excludedBookies); + ledgerFragment.getBookiesIndexes(), excludedBookies, true); } else if (LedgerFragment.ReplicateType.DATA_NOT_ADHERING_PLACEMENT == ledgerFragment.getReplicateType()) { targetBookieAddresses = replaceNotAdheringPlacementPolicyBookie(ledgerFragment.getEnsemble(), lh.getLedgerMetadata().getWriteQuorumSize(), lh.getLedgerMetadata().getAckQuorumSize()); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java index 94cd30344f1..01065f13bb4 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java @@ -123,16 +123,24 @@ public PlacementResult replaceBookie(int ensembleSize, int writeQuorum Map customMetadata, List currentEnsemble, BookieId bookieToReplace, Set excludeBookies) throws BKNotEnoughBookiesException { + return replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, currentEnsemble, + bookieToReplace, excludeBookies, false); + } + + @Override + public PlacementResult replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, + Map customMetadata, List currentEnsemble, BookieId bookieToReplace, + Set excludeBookies, boolean downgradeToSelf) throws BKNotEnoughBookiesException { excludeBookies.addAll(currentEnsemble); List addresses = newEnsemble(1, 1, 1, customMetadata, excludeBookies).getResult(); - + BookieId candidateAddr = addresses.get(0); List newEnsemble = new ArrayList(currentEnsemble); newEnsemble.set(currentEnsemble.indexOf(bookieToReplace), candidateAddr); return PlacementResult.of(candidateAddr, isEnsembleAdheringToPlacementPolicy(newEnsemble, writeQuorumSize, ackQuorumSize)); } - + @Override public Set onClusterChanged(Set writableBookies, Set readOnlyBookies) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java index 05a687b964d..9020e28a2fd 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java @@ -303,7 +303,38 @@ PlacementResult replaceBookie(int ensembleSize, BookieId bookieToReplace, Set excludeBookies) throws BKNotEnoughBookiesException; - + + /** + * Choose a new bookie to replace bookieToReplace. If no bookie available in the cluster, + * {@link BKNotEnoughBookiesException} is thrown. + * + *

If 'enforceMinNumRacksPerWriteQuorum' config is enabled then the bookies belonging to default + * faultzone (rack) will be excluded while selecting bookies. + * + * @param ensembleSize + * the value of ensembleSize + * @param writeQuorumSize + * the value of writeQuorumSize + * @param ackQuorumSize the value of ackQuorumSize (added since 4.5) + * @param customMetadata the value of customMetadata. it is the same user defined metadata that user + * provides in {@link BookKeeper#createLedger(int, int, int, BookKeeper.DigestType, byte[])} + * @param currentEnsemble the value of currentEnsemble + * @param bookieToReplace bookie to replace + * @param excludeBookies bookies that should not be considered as candidate. + * @param downgradeToSelf When there are no more bookie nodes and the node to be replaced is still alive, + * choose to downgrade the node being replaced. + * @throws BKNotEnoughBookiesException + * @return a placement result containing the new bookie address. + */ + PlacementResult replaceBookie(int ensembleSize, + int writeQuorumSize, + int ackQuorumSize, + Map customMetadata, + List currentEnsemble, + BookieId bookieToReplace, + Set excludeBookies, + boolean downgradeToSelf) + throws BKNotEnoughBookiesException; /** * Register a bookie as slow so that it is tried after available and read-only bookies. * diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java index 6ec9e5b1589..ce0b39e2bec 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java @@ -462,6 +462,14 @@ public PlacementResult replaceBookie(int ensembleSize, int writeQuorum Map customMetadata, List currentEnsemble, BookieId bookieToReplace, Set excludeBookies) throws BKNotEnoughBookiesException { + return replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, currentEnsemble, + bookieToReplace, excludeBookies, false); + } + + @Override + public PlacementResult replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, + Map customMetadata, List currentEnsemble, BookieId bookieToReplace, + Set excludeBookies, boolean downgradeToSelf) throws BKNotEnoughBookiesException { rwLock.readLock().lock(); try { excludeBookies = addDefaultRackBookiesIfMinNumRacksIsEnforced(excludeBookies); @@ -470,28 +478,41 @@ public PlacementResult replaceBookie(int ensembleSize, int writeQuorum if (null == bn) { bn = createBookieNode(bookieToReplace); } - + Set ensembleNodes = convertBookiesToNodes(currentEnsemble); Set excludeNodes = convertBookiesToNodes(excludeBookies); - + excludeNodes.addAll(ensembleNodes); excludeNodes.add(bn); ensembleNodes.remove(bn); - + Set networkLocationsToBeExcluded = getNetworkLocations(ensembleNodes); - + if (LOG.isDebugEnabled()) { LOG.debug("Try to choose a new bookie to replace {} from ensemble {}, excluding {}.", - bookieToReplace, ensembleNodes, excludeNodes); + bookieToReplace, ensembleNodes, excludeNodes); } // pick a candidate from same rack to replace - BookieNode candidate = selectFromNetworkLocation( - bn.getNetworkLocation(), - networkLocationsToBeExcluded, - excludeNodes, - TruePredicate.INSTANCE, - EnsembleForReplacementWithNoConstraints.INSTANCE, - !enforceMinNumRacksPerWriteQuorum); + BookieNode candidate; + try { + candidate = selectFromNetworkLocation( + bn.getNetworkLocation(), + networkLocationsToBeExcluded, + excludeNodes, + TruePredicate.INSTANCE, + EnsembleForReplacementWithNoConstraints.INSTANCE, + !enforceMinNumRacksPerWriteQuorum); + } catch (BKNotEnoughBookiesException e) { + if (downgradeToSelf) { + candidate = knownBookies.get(bookieToReplace); + if (candidate == null) { + throw e; + } + } else { + throw e; + } + } + if (LOG.isDebugEnabled()) { LOG.debug("Bookie {} is chosen to replace bookie {}.", candidate, bn); } @@ -512,7 +533,7 @@ public PlacementResult replaceBookie(int ensembleSize, int writeQuorum rwLock.readLock().unlock(); } } - + @Override public BookieNode selectFromNetworkLocation( String networkLoc, diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java index 463d9599de2..47b418501d6 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java @@ -846,4 +846,8 @@ protected BookieNode convertBookieToNode(BookieId addr) { } return bn; } + + public boolean isAlive(BookieId bookieId) { + return knownBookies.containsKey(bookieId); + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ZoneawareEnsemblePlacementPolicyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ZoneawareEnsemblePlacementPolicyImpl.java index 1ce04c4be31..54e3a2aab81 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ZoneawareEnsemblePlacementPolicyImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ZoneawareEnsemblePlacementPolicyImpl.java @@ -416,6 +416,14 @@ public PlacementResult replaceBookie(int ensembleSize, int writeQuorum Map customMetadata, List currentEnsemble, BookieId bookieToReplace, Set excludeBookies) throws BKNotEnoughBookiesException { + return replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, currentEnsemble, + bookieToReplace, excludeBookies, false); + } + + @Override + public PlacementResult replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, + Map customMetadata, List currentEnsemble, BookieId bookieToReplace, + Set excludeBookies, boolean downgradeToSelf) throws BKNotEnoughBookiesException { int bookieToReplaceIndex = currentEnsemble.indexOf(bookieToReplace); int desiredNumZonesPerWriteQuorumForThisEnsemble = (writeQuorumSize < desiredNumZonesPerWriteQuorum) ? writeQuorumSize : desiredNumZonesPerWriteQuorum; @@ -423,21 +431,47 @@ public PlacementResult replaceBookie(int ensembleSize, int writeQuorum rwLock.readLock().lock(); try { if (!enforceStrictZoneawarePlacement) { - return selectBookieRandomly(newEnsemble, bookieToReplace, excludeBookies, writeQuorumSize, - ackQuorumSize); + try { + return selectBookieRandomly(newEnsemble, bookieToReplace, excludeBookies, writeQuorumSize, + ackQuorumSize); + } catch (BKNotEnoughBookiesException e) { + if (downgradeToSelf) { + BookieNode bookieNode = knownBookies.get(bookieToReplace); + if (bookieNode == null) { + throw e; + } + return PlacementResult.of(bookieNode.getAddr(), + isEnsembleAdheringToPlacementPolicy(newEnsemble, writeQuorumSize, ackQuorumSize)); + } else { + throw e; + } + } } Set comprehensiveExclusionBookiesSet = addDefaultFaultDomainBookies(excludeBookies); comprehensiveExclusionBookiesSet.addAll(currentEnsemble); - BookieId candidateAddr = setBookieInTheEnsemble(ensembleSize, writeQuorumSize, currentEnsemble, - newEnsemble, bookieToReplaceIndex, desiredNumZonesPerWriteQuorumForThisEnsemble, - comprehensiveExclusionBookiesSet); + BookieId candidateAddr; + try { + candidateAddr = setBookieInTheEnsemble(ensembleSize, writeQuorumSize, currentEnsemble, + newEnsemble, bookieToReplaceIndex, desiredNumZonesPerWriteQuorumForThisEnsemble, + comprehensiveExclusionBookiesSet); + } catch (BKNotEnoughBookiesException e) { + if (downgradeToSelf) { + BookieNode bookieNode = knownBookies.get(bookieToReplace); + if (bookieNode == null) { + throw e; + } + candidateAddr = bookieNode.getAddr(); + } else { + throw e; + } + } return PlacementResult.of(candidateAddr, isEnsembleAdheringToPlacementPolicy(newEnsemble, writeQuorumSize, ackQuorumSize)); } finally { rwLock.readLock().unlock(); } } - + private PlacementResult> createNewEnsembleRandomly(List newEnsemble, int writeQuorumSize, int ackQuorumSize, Map customMetadata, Set excludeBookies) throws BKNotEnoughBookiesException { From 4d4b770609fae237b9821207c1d60e9af8475d01 Mon Sep 17 00:00:00 2001 From: horizonzy Date: Mon, 3 Jul 2023 14:04:05 +0800 Subject: [PATCH 02/13] Add test case. --- .../TestRackawareEnsemblePlacementPolicy.java | 71 +++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java index 95a7d5b40d7..6c81ff30631 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java @@ -25,6 +25,7 @@ import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.netty.util.HashedWheelTimer; import java.net.InetAddress; @@ -68,6 +69,7 @@ import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.TypeSafeMatcher; +import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -636,6 +638,75 @@ public void testIsEnsembleAdheringToPlacementPolicy() throws Exception { repp.isEnsembleAdheringToPlacementPolicy(ensemble, 3, 3)); } + @Test + public void testReplaceBookieWithNewEnsemble() throws Exception { + BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181); + BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181); + BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181); + BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181); + BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.5", 3181); + // update dns mapping + StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/default-region/r2"); + StaticDNSResolver.addNodeToRack(addr5.getHostName(), "/default-region/r3"); + + ClientConfiguration conf = (ClientConfiguration) this.conf.clone(); + conf.setMinNumRacksPerWriteQuorum(3); + + repp = new RackawareEnsemblePlacementPolicy(); + repp.initialize(conf, Optional. empty(), timer, DISABLE_ALL, + NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); + repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK); + + // Update cluster + Set addrs = new HashSet(); + addrs.add(addr1.toBookieId()); + addrs.add(addr2.toBookieId()); + addrs.add(addr3.toBookieId()); + addrs.add(addr4.toBookieId()); + addrs.add(addr5.toBookieId()); + repp.onClusterChanged(addrs, new HashSet<>()); + + Set bookieIndexesToRereplicate = new HashSet<>(); + bookieIndexesToRereplicate.add(1); + bookieIndexesToRereplicate.add(2); + + List ensemble = new ArrayList<>(); + ensemble.add(addr1.toBookieId()); + ensemble.add(addr2.toBookieId()); + ensemble.add(addr3.toBookieId()); + + Set bookiesToExclude = Sets.newHashSet(); + + for (Integer bookieIndex : bookieIndexesToRereplicate) { + BookieId bookie = ensemble.get(bookieIndex); + bookiesToExclude.add(bookie); + } + + List newEnsemble = new ArrayList<>(ensemble); + + int i = 0; + for (Integer bookieIndex : bookieIndexesToRereplicate) { + BookieId oldBookie = newEnsemble.get(bookieIndex); + EnsemblePlacementPolicy.PlacementResult replaceBookieResponse = repp.replaceBookie(3, 3, 2, + Collections.emptyMap(), newEnsemble, oldBookie, bookiesToExclude); + BookieId newBookie = replaceBookieResponse.getResult(); + PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getAdheringToPolicy(); + if (i == 0) { + Assert.assertEquals(PlacementPolicyAdherence.FAIL, isEnsembleAdheringToPlacementPolicy); + } + if (i == 1) { + Assert.assertEquals(PlacementPolicyAdherence.MEETS_STRICT, isEnsembleAdheringToPlacementPolicy); + } + bookiesToExclude.add(newBookie); + //We should update ensemble after replace. + newEnsemble.set(bookieIndex, newBookie); + i++; + } + } + @Test public void testReplaceBookieWithEnoughBookiesInSameRack() throws Exception { BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181); From cc844f763ea5f2530d9b541f2a75ecc108fe1488 Mon Sep 17 00:00:00 2001 From: horizonzy Date: Mon, 3 Jul 2023 15:29:09 +0800 Subject: [PATCH 03/13] Add test case. --- .../DefaultEnsemblePlacementPolicy.java | 27 +++++++-- .../RackawareEnsemblePlacementPolicy.java | 18 ++++++ .../RackawareEnsemblePlacementPolicyImpl.java | 14 +++-- .../RegionAwareEnsemblePlacementPolicy.java | 48 +++++++++++---- .../ZoneawareEnsemblePlacementPolicy.java | 15 ++++- .../ZoneawareEnsemblePlacementPolicyImpl.java | 12 +++- .../TestRackawareEnsemblePlacementPolicy.java | 42 +++++++++++++ ...estRegionAwareEnsemblePlacementPolicy.java | 42 +++++++++++++ .../TestZoneawareEnsemblePlacementPolicy.java | 60 +++++++++++++++++++ 9 files changed, 251 insertions(+), 27 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java index 01065f13bb4..36eabc01ea9 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java @@ -126,21 +126,36 @@ public PlacementResult replaceBookie(int ensembleSize, int writeQuorum return replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, currentEnsemble, bookieToReplace, excludeBookies, false); } - + @Override public PlacementResult replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, Map customMetadata, List currentEnsemble, BookieId bookieToReplace, Set excludeBookies, boolean downgradeToSelf) throws BKNotEnoughBookiesException { excludeBookies.addAll(currentEnsemble); - List addresses = newEnsemble(1, 1, 1, customMetadata, excludeBookies).getResult(); - - BookieId candidateAddr = addresses.get(0); List newEnsemble = new ArrayList(currentEnsemble); - newEnsemble.set(currentEnsemble.indexOf(bookieToReplace), candidateAddr); + BookieId candidateAddr; + try { + List addresses = newEnsemble(1, 1, 1, customMetadata, excludeBookies).getResult(); + candidateAddr = addresses.get(0); + newEnsemble.set(currentEnsemble.indexOf(bookieToReplace), candidateAddr); + } catch (BKNotEnoughBookiesException e) { + if (downgradeToSelf) { + if (!knownBookies.contains(bookieToReplace)) { + throw e; + } + candidateAddr = bookieToReplace; + if (LOG.isDebugEnabled()) { + LOG.debug("There is no more available bookies to replace, and the waiting to be replaced: " + + "{} bookie is alive. Replace the bookie with itself.", bookieToReplace); + } + } else { + throw e; + } + } return PlacementResult.of(candidateAddr, isEnsembleAdheringToPlacementPolicy(newEnsemble, writeQuorumSize, ackQuorumSize)); } - + @Override public Set onClusterChanged(Set writableBookies, Set readOnlyBookies) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java index 72858f188f7..51fede38ec1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java @@ -128,6 +128,24 @@ public PlacementResult replaceBookie(int ensembleSize, int writeQuorum } } + @Override + public PlacementResult replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, + Map customMetadata, List currentEnsemble, + BookieId bookieToReplace, Set excludeBookies, boolean downgradeToSelf) + throws BKException.BKNotEnoughBookiesException { + try { + return super.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, + currentEnsemble, bookieToReplace, excludeBookies, downgradeToSelf); + } catch (BKException.BKNotEnoughBookiesException bnebe) { + if (slave == null) { + throw bnebe; + } else { + return slave.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, + currentEnsemble, bookieToReplace, excludeBookies, downgradeToSelf); + } + } + } + @Override public DistributionSchedule.WriteSet reorderReadSequence( List ensemble, diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java index ce0b39e2bec..630cd0a9294 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java @@ -465,7 +465,7 @@ public PlacementResult replaceBookie(int ensembleSize, int writeQuorum return replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, currentEnsemble, bookieToReplace, excludeBookies, false); } - + @Override public PlacementResult replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, Map customMetadata, List currentEnsemble, BookieId bookieToReplace, @@ -478,14 +478,14 @@ public PlacementResult replaceBookie(int ensembleSize, int writeQuorum if (null == bn) { bn = createBookieNode(bookieToReplace); } - + Set ensembleNodes = convertBookiesToNodes(currentEnsemble); Set excludeNodes = convertBookiesToNodes(excludeBookies); - + excludeNodes.addAll(ensembleNodes); excludeNodes.add(bn); ensembleNodes.remove(bn); - + Set networkLocationsToBeExcluded = getNetworkLocations(ensembleNodes); if (LOG.isDebugEnabled()) { @@ -508,6 +508,10 @@ public PlacementResult replaceBookie(int ensembleSize, int writeQuorum if (candidate == null) { throw e; } + if (LOG.isDebugEnabled()) { + LOG.debug("There is no more available bookies to replace, and the waiting to be " + + "replaced bookie: {} is alive. Replace the bookie with itself.", bookieToReplace); + } } else { throw e; } @@ -533,7 +537,7 @@ public PlacementResult replaceBookie(int ensembleSize, int writeQuorum rwLock.readLock().unlock(); } } - + @Override public BookieNode selectFromNetworkLocation( String networkLoc, diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java index 43969b8fde3..02e2b101e6a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java @@ -490,6 +490,15 @@ public PlacementResult replaceBookie(int ensembleSize, int writeQuorum Map customMetadata, List currentEnsemble, BookieId bookieToReplace, Set excludeBookies) throws BKException.BKNotEnoughBookiesException { + return replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, currentEnsemble, + bookieToReplace, excludeBookies, false); + } + + @Override + public PlacementResult replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, + Map customMetadata, List currentEnsemble, + BookieId bookieToReplace, Set excludeBookies, boolean downgradeToSelf) + throws BKException.BKNotEnoughBookiesException { rwLock.readLock().lock(); try { boolean enforceDurability = enforceDurabilityInReplace && !disableDurabilityFeature.isAvailable(); @@ -498,11 +507,11 @@ public PlacementResult replaceBookie(int ensembleSize, int writeQuorum excludeBookies); Set excludeNodes = convertBookiesToNodes(comprehensiveExclusionBookiesSet); RRTopologyAwareCoverageEnsemble ensemble = new RRTopologyAwareCoverageEnsemble(ensembleSize, - writeQuorumSize, - ackQuorumSize, - REGIONID_DISTANCE_FROM_LEAVES, - effectiveMinRegionsForDurability > 0 ? new HashSet(perRegionPlacement.keySet()) : null, - effectiveMinRegionsForDurability, minNumRacksPerWriteQuorum); + writeQuorumSize, + ackQuorumSize, + REGIONID_DISTANCE_FROM_LEAVES, + effectiveMinRegionsForDurability > 0 ? new HashSet(perRegionPlacement.keySet()) : null, + effectiveMinRegionsForDurability, minNumRacksPerWriteQuorum); BookieNode bookieNodeToReplace = knownBookies.get(bookieToReplace); if (null == bookieNodeToReplace) { @@ -535,15 +544,32 @@ public PlacementResult replaceBookie(int ensembleSize, int writeQuorum if (LOG.isDebugEnabled()) { LOG.debug("Try to choose a new bookie to replace {}, excluding {}.", bookieToReplace, - excludeNodes); + excludeNodes); } // pick a candidate from same rack to replace - BookieNode candidate = replaceFromRack(bookieNodeToReplace, excludeNodes, - ensemble, ensemble, enforceDurability); - if (LOG.isDebugEnabled()) { - LOG.debug("Bookie {} is chosen to replace bookie {}.", candidate, bookieNodeToReplace); + BookieId candidateAddr; + try { + BookieNode candidate = replaceFromRack(bookieNodeToReplace, excludeNodes, + ensemble, ensemble, enforceDurability); + candidateAddr = candidate.getAddr(); + if (LOG.isDebugEnabled()) { + LOG.debug("Bookie {} is chosen to replace bookie {}.", candidate, bookieNodeToReplace); + } + } catch (BKException.BKNotEnoughBookiesException e) { + if (downgradeToSelf) { + BookieNode bn = knownBookies.get(bookieToReplace); + if (bn == null) { + throw e; + } + if (LOG.isDebugEnabled()) { + LOG.debug("There is no more available bookies to replace, and the waiting to be " + + "replaced bookie: {} is alive. Replace the bookie with itself.", bookieToReplace); + } + candidateAddr = bn.getAddr(); + } else { + throw e; + } } - BookieId candidateAddr = candidate.getAddr(); List newEnsemble = new ArrayList(currentEnsemble); if (currentEnsemble.isEmpty()) { /* diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ZoneawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ZoneawareEnsemblePlacementPolicy.java index cf36c7d83fe..d8de2756cd7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ZoneawareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ZoneawareEnsemblePlacementPolicy.java @@ -100,15 +100,24 @@ public PlacementResult replaceBookie(int ensembleSize, int writeQuorum Map customMetadata, List currentEnsemble, BookieId bookieToReplace, Set excludeBookies) throws BKException.BKNotEnoughBookiesException { - try { + return replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, currentEnsemble, + bookieToReplace, excludeBookies, false); + } + + @Override + public PlacementResult replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, + Map customMetadata, List currentEnsemble, + BookieId bookieToReplace, Set excludeBookies, boolean downgradeToSelf) + throws BKException.BKNotEnoughBookiesException { + try { return super.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, - currentEnsemble, bookieToReplace, excludeBookies); + currentEnsemble, bookieToReplace, excludeBookies, downgradeToSelf); } catch (BKException.BKNotEnoughBookiesException bnebe) { if (slave == null) { throw bnebe; } else { return slave.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, - currentEnsemble, bookieToReplace, excludeBookies); + currentEnsemble, bookieToReplace, excludeBookies, downgradeToSelf); } } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ZoneawareEnsemblePlacementPolicyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ZoneawareEnsemblePlacementPolicyImpl.java index 54e3a2aab81..0ea8c2a3e9f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ZoneawareEnsemblePlacementPolicyImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ZoneawareEnsemblePlacementPolicyImpl.java @@ -419,7 +419,7 @@ public PlacementResult replaceBookie(int ensembleSize, int writeQuorum return replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, currentEnsemble, bookieToReplace, excludeBookies, false); } - + @Override public PlacementResult replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, Map customMetadata, List currentEnsemble, BookieId bookieToReplace, @@ -440,6 +440,10 @@ public PlacementResult replaceBookie(int ensembleSize, int writeQuorum if (bookieNode == null) { throw e; } + if (LOG.isDebugEnabled()) { + LOG.debug("There is no more available bookies to replace, and the waiting to be " + + "replaced bookie: {} is alive. Replace the bookie with itself.", bookieToReplace); + } return PlacementResult.of(bookieNode.getAddr(), isEnsembleAdheringToPlacementPolicy(newEnsemble, writeQuorumSize, ackQuorumSize)); } else { @@ -460,6 +464,10 @@ public PlacementResult replaceBookie(int ensembleSize, int writeQuorum if (bookieNode == null) { throw e; } + if (LOG.isDebugEnabled()) { + LOG.debug("There is no more available bookies to replace, and the waiting to be " + + "replaced bookie: {} is alive. Replace the bookie with itself.", bookieToReplace); + } candidateAddr = bookieNode.getAddr(); } else { throw e; @@ -471,7 +479,7 @@ public PlacementResult replaceBookie(int ensembleSize, int writeQuorum rwLock.readLock().unlock(); } } - + private PlacementResult> createNewEnsembleRandomly(List newEnsemble, int writeQuorumSize, int ackQuorumSize, Map customMetadata, Set excludeBookies) throws BKNotEnoughBookiesException { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java index 6c81ff30631..3fd221de1e8 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java @@ -707,6 +707,48 @@ public void testReplaceBookieWithNewEnsemble() throws Exception { } } + @Test + public void testReplaceBookieWithNoMoreBookies() throws Exception { + BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181); + BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181); + BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181); + // update dns mapping + StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/default-region/r2"); + StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/default-region/r3"); + // Update cluster + Set addrs = new HashSet(); + addrs.add(addr1.toBookieId()); + addrs.add(addr2.toBookieId()); + addrs.add(addr3.toBookieId()); + + List ensemble = new ArrayList<>(); + ensemble.add(addr1.toBookieId()); + ensemble.add(addr2.toBookieId()); + ensemble.add(addr3.toBookieId()); + + + repp.onClusterChanged(addrs, new HashSet()); + try { + repp.replaceBookie(1, 1, 1, null, ensemble, addr2.toBookieId(), new HashSet<>()); + fail("Should throw BKNotEnoughBookiesException when there is not enough bookies"); + } catch (BKNotEnoughBookiesException ignore) { + } + + EnsemblePlacementPolicy.PlacementResult replaceBookieResponse = repp.replaceBookie(1, 1, 1, + null, ensemble, addr2.toBookieId(), new HashSet<>(), true); + BookieId replacedBookie = replaceBookieResponse.getResult(); + assertEquals(addr2.toBookieId(), replacedBookie); + + addrs.remove(addr2.toBookieId()); + repp.onClusterChanged(addrs, new HashSet()); + try { + repp.replaceBookie(1, 1, 1, null, ensemble, addr2.toBookieId(), new HashSet<>(), true); + fail("Should throw BKNotEnoughBookiesException when there is not enough bookies"); + } catch (BKNotEnoughBookiesException ignore) { + } + } + @Test public void testReplaceBookieWithEnoughBookiesInSameRack() throws Exception { BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java index ba9c4f862ff..a1dfd872355 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java @@ -418,6 +418,48 @@ public void testNodeDownAndReadOnlyAndNodeSlow() throws Exception { assertEquals(expectedSet, reorderSet); } + @Test + public void testReplaceBookieWithNoMoreBookie() throws Exception { + BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181); + BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181); + BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181); + // update dns mapping + StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/region1/r1"); + StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/region1/r2"); + StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/default-region/r3"); + + // Update cluster + Set addrs = new HashSet(); + addrs.add(addr1.toBookieId()); + addrs.add(addr2.toBookieId()); + addrs.add(addr3.toBookieId()); + + List ensemble = new ArrayList<>(); + ensemble.add(addr1.toBookieId()); + ensemble.add(addr2.toBookieId()); + ensemble.add(addr3.toBookieId()); + + repp.onClusterChanged(addrs, new HashSet()); + try { + repp.replaceBookie(1, 1, 1, null, ensemble, addr2.toBookieId(), new HashSet<>()); + fail("Should throw BKNotEnoughBookiesException when there is not enough bookies"); + } catch (BKNotEnoughBookiesException ignore) { + } + + EnsemblePlacementPolicy.PlacementResult replaceBookieResponse = repp.replaceBookie(1, 1, 1, + null, ensemble, addr2.toBookieId(), new HashSet<>(), true); + BookieId replacedBookie = replaceBookieResponse.getResult(); + assertEquals(addr2.toBookieId(), replacedBookie); + + addrs.remove(addr2.toBookieId()); + repp.onClusterChanged(addrs, new HashSet()); + try { + repp.replaceBookie(1, 1, 1, null, ensemble, addr2.toBookieId(), new HashSet<>(), true); + fail("Should throw BKNotEnoughBookiesException when there is not enough bookies"); + } catch (BKNotEnoughBookiesException ignore) { + } + } + @Test public void testReplaceBookieWithEnoughBookiesInSameRegion() throws Exception { BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestZoneawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestZoneawareEnsemblePlacementPolicy.java index b191225c834..9060d44d9b0 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestZoneawareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestZoneawareEnsemblePlacementPolicy.java @@ -857,6 +857,66 @@ public void testReplaceBookieMinUDs() throws Exception { replaceResponse.getAdheringToPolicy()); } + @Test + public void testReplaceBookieWithNoMoreBookie() throws Exception { + zepp.uninitalize(); + updateMyUpgradeDomain(NetworkTopology.DEFAULT_ZONE_AND_UPGRADEDOMAIN); + + // Update cluster + BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.6", 3181); + BookieSocketAddress addr6 = new BookieSocketAddress("127.0.0.6", 3181); + BookieSocketAddress addr7 = new BookieSocketAddress("127.0.0.6", 3181); + + // update dns mapping + StaticDNSResolver.addNodeToRack(addr5.getHostName(), "/zone1/ud1"); + StaticDNSResolver.addNodeToRack(addr6.getHostName(), "/zone1/ud2"); + StaticDNSResolver.addNodeToRack(addr7.getHostName(), "/zone2/ud1"); + + ClientConfiguration newConf = new ClientConfiguration(conf); + newConf.addConfiguration(conf); + newConf.setDiskWeightBasedPlacementEnabled(true); + /* + * since BookieMaxWeightMultipleForWeightBasedPlacement is set to -1, + * there is no max cap on weight. + */ + newConf.setBookieMaxWeightMultipleForWeightBasedPlacement(-1); + newConf.setMinNumZonesPerWriteQuorum(0); + zepp.initialize(newConf, Optional. empty(), timer, DISABLE_ALL, + NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); + zepp.withDefaultFaultDomain(NetworkTopology.DEFAULT_ZONE_AND_UPGRADEDOMAIN); + + // Update cluster + Set addrs = new HashSet(); + addrs.add(addr5.toBookieId()); + addrs.add(addr6.toBookieId()); + addrs.add(addr7.toBookieId()); + + List ensemble = new ArrayList<>(); + ensemble.add(addr5.toBookieId()); + ensemble.add(addr6.toBookieId()); + ensemble.add(addr7.toBookieId()); + + zepp.onClusterChanged(addrs, new HashSet()); + try { + zepp.replaceBookie(1, 1, 1, null, ensemble, addr6.toBookieId(), new HashSet<>()); + fail("Should throw BKNotEnoughBookiesException when there is not enough bookies"); + } catch (BKException.BKNotEnoughBookiesException ignore) { + } + + EnsemblePlacementPolicy.PlacementResult replaceBookieResponse = zepp.replaceBookie(1, 1, 1, + null, ensemble, addr6.toBookieId(), new HashSet<>(), true); + BookieId replacedBookie = replaceBookieResponse.getResult(); + assertEquals(addr6.toBookieId(), replacedBookie); + + addrs.remove(addr6.toBookieId()); + zepp.onClusterChanged(addrs, new HashSet()); + try { + zepp.replaceBookie(1, 1, 1, null, ensemble, addr6.toBookieId(), new HashSet<>(), true); + fail("Should throw BKNotEnoughBookiesException when there is not enough bookies"); + } catch (BKException.BKNotEnoughBookiesException ignore) { + } + } + @Test public void testAreAckedBookiesAdheringToPlacementPolicy() throws Exception { zepp.uninitalize(); From 792f0c5a4898dd52daadf42bcb6199ee9dfcf53a Mon Sep 17 00:00:00 2001 From: horizonzy Date: Mon, 3 Jul 2023 16:00:49 +0800 Subject: [PATCH 04/13] Fix checkstyle. --- .../bookie/LocalBookieEnsemblePlacementPolicy.java | 4 ++-- .../org/apache/bookkeeper/client/BookKeeperAdmin.java | 4 ++-- .../bookkeeper/client/EnsemblePlacementPolicy.java | 2 +- .../client/RackawareEnsemblePlacementPolicyImpl.java | 2 +- .../client/TopologyAwareEnsemblePlacementPolicy.java | 2 +- .../client/TestRackawareEnsemblePlacementPolicy.java | 9 ++++----- .../client/TestRegionAwareEnsemblePlacementPolicy.java | 10 +++++----- .../client/TestZoneawareEnsemblePlacementPolicy.java | 2 +- 8 files changed, 17 insertions(+), 18 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java index 860f9ff4ffa..dcba8c70eb6 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java @@ -89,14 +89,14 @@ public PlacementResult replaceBookie(int ensembleSize, int writeQuorum return this.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, currentEnsemble, bookieToReplace, excludeBookies, false); } - + @Override public PlacementResult replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, Map customMetadata, List currentEnsemble, BookieId bookieToReplace, Set excludeBookies, boolean downgradeToSelf) throws BKNotEnoughBookiesException { throw new BKNotEnoughBookiesException(); } - + @Override public void registerSlowBookie(BookieId bookieSocketAddress, long entryId) { return; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java index 2c2c1e702f3..117f7e22697 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java @@ -1090,13 +1090,13 @@ private Map getReplacementBookiesByIndexes( orderedBookieIndexesToRereplicate.add(0, index); } } - + // excluding bookies that need to be replicated for (Integer bookieIndex : orderedBookieIndexesToRereplicate) { BookieId bookie = ensemble.get(bookieIndex); bookiesToExclude.add(bookie); } - List newEnsemble = new ArrayList<>(ensemble) ; + List newEnsemble = new ArrayList<>(ensemble); // allocate bookies for (Integer bookieIndex : orderedBookieIndexesToRereplicate) { BookieId oldBookie = newEnsemble.get(bookieIndex); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java index 9020e28a2fd..61af595d439 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java @@ -303,7 +303,7 @@ PlacementResult replaceBookie(int ensembleSize, BookieId bookieToReplace, Set excludeBookies) throws BKNotEnoughBookiesException; - + /** * Choose a new bookie to replace bookieToReplace. If no bookie available in the cluster, * {@link BKNotEnoughBookiesException} is thrown. diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java index 630cd0a9294..fecac313111 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java @@ -487,7 +487,7 @@ public PlacementResult replaceBookie(int ensembleSize, int writeQuorum ensembleNodes.remove(bn); Set networkLocationsToBeExcluded = getNetworkLocations(ensembleNodes); - + if (LOG.isDebugEnabled()) { LOG.debug("Try to choose a new bookie to replace {} from ensemble {}, excluding {}.", bookieToReplace, ensembleNodes, excludeNodes); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java index 47b418501d6..df0f4b3a542 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java @@ -846,7 +846,7 @@ protected BookieNode convertBookieToNode(BookieId addr) { } return bn; } - + public boolean isAlive(BookieId bookieId) { return knownBookies.containsKey(bookieId); } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java index 3fd221de1e8..a3a38e5a98d 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java @@ -721,25 +721,24 @@ public void testReplaceBookieWithNoMoreBookies() throws Exception { addrs.add(addr1.toBookieId()); addrs.add(addr2.toBookieId()); addrs.add(addr3.toBookieId()); - + List ensemble = new ArrayList<>(); ensemble.add(addr1.toBookieId()); ensemble.add(addr2.toBookieId()); ensemble.add(addr3.toBookieId()); - - + repp.onClusterChanged(addrs, new HashSet()); try { repp.replaceBookie(1, 1, 1, null, ensemble, addr2.toBookieId(), new HashSet<>()); fail("Should throw BKNotEnoughBookiesException when there is not enough bookies"); } catch (BKNotEnoughBookiesException ignore) { } - + EnsemblePlacementPolicy.PlacementResult replaceBookieResponse = repp.replaceBookie(1, 1, 1, null, ensemble, addr2.toBookieId(), new HashSet<>(), true); BookieId replacedBookie = replaceBookieResponse.getResult(); assertEquals(addr2.toBookieId(), replacedBookie); - + addrs.remove(addr2.toBookieId()); repp.onClusterChanged(addrs, new HashSet()); try { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java index a1dfd872355..a0fe575787b 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java @@ -427,30 +427,30 @@ public void testReplaceBookieWithNoMoreBookie() throws Exception { StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/region1/r1"); StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/region1/r2"); StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/default-region/r3"); - + // Update cluster Set addrs = new HashSet(); addrs.add(addr1.toBookieId()); addrs.add(addr2.toBookieId()); addrs.add(addr3.toBookieId()); - + List ensemble = new ArrayList<>(); ensemble.add(addr1.toBookieId()); ensemble.add(addr2.toBookieId()); ensemble.add(addr3.toBookieId()); - + repp.onClusterChanged(addrs, new HashSet()); try { repp.replaceBookie(1, 1, 1, null, ensemble, addr2.toBookieId(), new HashSet<>()); fail("Should throw BKNotEnoughBookiesException when there is not enough bookies"); } catch (BKNotEnoughBookiesException ignore) { } - + EnsemblePlacementPolicy.PlacementResult replaceBookieResponse = repp.replaceBookie(1, 1, 1, null, ensemble, addr2.toBookieId(), new HashSet<>(), true); BookieId replacedBookie = replaceBookieResponse.getResult(); assertEquals(addr2.toBookieId(), replacedBookie); - + addrs.remove(addr2.toBookieId()); repp.onClusterChanged(addrs, new HashSet()); try { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestZoneawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestZoneawareEnsemblePlacementPolicy.java index 9060d44d9b0..3ce95cdc3fb 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestZoneawareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestZoneawareEnsemblePlacementPolicy.java @@ -871,7 +871,7 @@ public void testReplaceBookieWithNoMoreBookie() throws Exception { StaticDNSResolver.addNodeToRack(addr5.getHostName(), "/zone1/ud1"); StaticDNSResolver.addNodeToRack(addr6.getHostName(), "/zone1/ud2"); StaticDNSResolver.addNodeToRack(addr7.getHostName(), "/zone2/ud1"); - + ClientConfiguration newConf = new ClientConfiguration(conf); newConf.addConfiguration(conf); newConf.setDiskWeightBasedPlacementEnabled(true); From 0545c114f4339a596370882cbc1135bfadacf2c6 Mon Sep 17 00:00:00 2001 From: horizonzy Date: Tue, 4 Jul 2023 11:15:45 +0800 Subject: [PATCH 05/13] Update doc. --- .../org/apache/bookkeeper/client/EnsemblePlacementPolicy.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java index 61af595d439..576daec8cbd 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java @@ -322,7 +322,7 @@ PlacementResult replaceBookie(int ensembleSize, * @param bookieToReplace bookie to replace * @param excludeBookies bookies that should not be considered as candidate. * @param downgradeToSelf When there are no more bookie nodes and the node to be replaced is still alive, - * choose to downgrade the node being replaced. + * downgrade to choose the node being replaced itself. * @throws BKNotEnoughBookiesException * @return a placement result containing the new bookie address. */ From c669e80c899600081565163373cb3f9c88d70e78 Mon Sep 17 00:00:00 2001 From: horizonzy Date: Thu, 6 Jul 2023 00:19:07 +0800 Subject: [PATCH 06/13] Make downgrade to self as the default behavior. --- .../LocalBookieEnsemblePlacementPolicy.java | 9 +--- .../bookkeeper/client/BookKeeperAdmin.java | 9 ++-- .../DefaultEnsemblePlacementPolicy.java | 27 +++------- .../client/EnsemblePlacementPolicy.java | 31 ----------- .../RackawareEnsemblePlacementPolicy.java | 20 +------ .../RackawareEnsemblePlacementPolicyImpl.java | 24 +++------ .../RegionAwareEnsemblePlacementPolicy.java | 27 +++------- .../ZoneawareEnsemblePlacementPolicy.java | 20 +------ .../ZoneawareEnsemblePlacementPolicyImpl.java | 52 +++++++------------ .../TestRackawareEnsemblePlacementPolicy.java | 9 +--- ...estRegionAwareEnsemblePlacementPolicy.java | 9 +--- .../TestZoneawareEnsemblePlacementPolicy.java | 10 +--- 12 files changed, 51 insertions(+), 196 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java index dcba8c70eb6..6c349991fc8 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java @@ -87,14 +87,7 @@ public PlacementResult replaceBookie(int ensembleSize, int writeQuorum BookieId bookieToReplace, Set excludeBookies) throws BKNotEnoughBookiesException { return this.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, currentEnsemble, - bookieToReplace, excludeBookies, false); - } - - @Override - public PlacementResult replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, - Map customMetadata, List currentEnsemble, BookieId bookieToReplace, - Set excludeBookies, boolean downgradeToSelf) throws BKNotEnoughBookiesException { - throw new BKNotEnoughBookiesException(); + bookieToReplace, excludeBookies); } @Override diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java index 117f7e22697..d8c17126455 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java @@ -1061,15 +1061,14 @@ private Map getReplacementBookies( } } return getReplacementBookiesByIndexes( - lh, ensemble, bookieIndexesToRereplicate, Optional.of(bookiesToRereplicate), false); + lh, ensemble, bookieIndexesToRereplicate, Optional.of(bookiesToRereplicate)); } private Map getReplacementBookiesByIndexes( LedgerHandle lh, List ensemble, Set bookieIndexesToRereplicate, - Optional> excludedBookies, - boolean downgradeToSelf) + Optional> excludedBookies) throws BKException.BKNotEnoughBookiesException { // target bookies to replicate Map targetBookieAddresses = @@ -1108,7 +1107,7 @@ private Map getReplacementBookiesByIndexes( lh.getLedgerMetadata().getCustomMetadata(), newEnsemble, oldBookie, - bookiesToExclude, downgradeToSelf); + bookiesToExclude); BookieId newBookie = replaceBookieResponse.getResult(); PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getAdheringToPolicy(); if (isEnsembleAdheringToPlacementPolicy == PlacementPolicyAdherence.FAIL && LOG.isDebugEnabled()) { @@ -1149,7 +1148,7 @@ public void replicateLedgerFragment(LedgerHandle lh, final LedgerFragment ledger if (LedgerFragment.ReplicateType.DATA_LOSS == ledgerFragment.getReplicateType()) { Optional> excludedBookies = Optional.empty(); targetBookieAddresses = getReplacementBookiesByIndexes(lh, ledgerFragment.getEnsemble(), - ledgerFragment.getBookiesIndexes(), excludedBookies, true); + ledgerFragment.getBookiesIndexes(), excludedBookies); } else if (LedgerFragment.ReplicateType.DATA_NOT_ADHERING_PLACEMENT == ledgerFragment.getReplicateType()) { targetBookieAddresses = replaceNotAdheringPlacementPolicyBookie(ledgerFragment.getEnsemble(), lh.getLedgerMetadata().getWriteQuorumSize(), lh.getLedgerMetadata().getAckQuorumSize()); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java index 36eabc01ea9..06b062eccd1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java @@ -118,19 +118,10 @@ public PlacementResult> newEnsemble(int ensembleSize, int quorumS throw new BKNotEnoughBookiesException(); } - @Override - public PlacementResult replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, - Map customMetadata, List currentEnsemble, - BookieId bookieToReplace, Set excludeBookies) - throws BKNotEnoughBookiesException { - return replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, currentEnsemble, - bookieToReplace, excludeBookies, false); - } - @Override public PlacementResult replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, Map customMetadata, List currentEnsemble, BookieId bookieToReplace, - Set excludeBookies, boolean downgradeToSelf) throws BKNotEnoughBookiesException { + Set excludeBookies) throws BKNotEnoughBookiesException { excludeBookies.addAll(currentEnsemble); List newEnsemble = new ArrayList(currentEnsemble); BookieId candidateAddr; @@ -139,18 +130,14 @@ public PlacementResult replaceBookie(int ensembleSize, int writeQuorum candidateAddr = addresses.get(0); newEnsemble.set(currentEnsemble.indexOf(bookieToReplace), candidateAddr); } catch (BKNotEnoughBookiesException e) { - if (downgradeToSelf) { - if (!knownBookies.contains(bookieToReplace)) { - throw e; - } - candidateAddr = bookieToReplace; - if (LOG.isDebugEnabled()) { - LOG.debug("There is no more available bookies to replace, and the waiting to be replaced: " - + "{} bookie is alive. Replace the bookie with itself.", bookieToReplace); - } - } else { + if (!knownBookies.contains(bookieToReplace)) { throw e; } + candidateAddr = bookieToReplace; + if (LOG.isDebugEnabled()) { + LOG.debug("There is no more available bookies to replace, and the waiting to be replaced: " + + "{} bookie is alive. Replace the bookie with itself.", bookieToReplace); + } } return PlacementResult.of(candidateAddr, isEnsembleAdheringToPlacementPolicy(newEnsemble, writeQuorumSize, ackQuorumSize)); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java index 576daec8cbd..05a687b964d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java @@ -304,37 +304,6 @@ PlacementResult replaceBookie(int ensembleSize, Set excludeBookies) throws BKNotEnoughBookiesException; - /** - * Choose a new bookie to replace bookieToReplace. If no bookie available in the cluster, - * {@link BKNotEnoughBookiesException} is thrown. - * - *

If 'enforceMinNumRacksPerWriteQuorum' config is enabled then the bookies belonging to default - * faultzone (rack) will be excluded while selecting bookies. - * - * @param ensembleSize - * the value of ensembleSize - * @param writeQuorumSize - * the value of writeQuorumSize - * @param ackQuorumSize the value of ackQuorumSize (added since 4.5) - * @param customMetadata the value of customMetadata. it is the same user defined metadata that user - * provides in {@link BookKeeper#createLedger(int, int, int, BookKeeper.DigestType, byte[])} - * @param currentEnsemble the value of currentEnsemble - * @param bookieToReplace bookie to replace - * @param excludeBookies bookies that should not be considered as candidate. - * @param downgradeToSelf When there are no more bookie nodes and the node to be replaced is still alive, - * downgrade to choose the node being replaced itself. - * @throws BKNotEnoughBookiesException - * @return a placement result containing the new bookie address. - */ - PlacementResult replaceBookie(int ensembleSize, - int writeQuorumSize, - int ackQuorumSize, - Map customMetadata, - List currentEnsemble, - BookieId bookieToReplace, - Set excludeBookies, - boolean downgradeToSelf) - throws BKNotEnoughBookiesException; /** * Register a bookie as slow so that it is tried after available and read-only bookies. * diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java index 51fede38ec1..0101db6092d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java @@ -115,7 +115,7 @@ public PlacementResult replaceBookie(int ensembleSize, int writeQuorum Map customMetadata, List currentEnsemble, BookieId bookieToReplace, Set excludeBookies) throws BKException.BKNotEnoughBookiesException { - try { + try { return super.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, currentEnsemble, bookieToReplace, excludeBookies); } catch (BKException.BKNotEnoughBookiesException bnebe) { @@ -128,24 +128,6 @@ public PlacementResult replaceBookie(int ensembleSize, int writeQuorum } } - @Override - public PlacementResult replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, - Map customMetadata, List currentEnsemble, - BookieId bookieToReplace, Set excludeBookies, boolean downgradeToSelf) - throws BKException.BKNotEnoughBookiesException { - try { - return super.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, - currentEnsemble, bookieToReplace, excludeBookies, downgradeToSelf); - } catch (BKException.BKNotEnoughBookiesException bnebe) { - if (slave == null) { - throw bnebe; - } else { - return slave.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, - currentEnsemble, bookieToReplace, excludeBookies, downgradeToSelf); - } - } - } - @Override public DistributionSchedule.WriteSet reorderReadSequence( List ensemble, diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java index fecac313111..d105e971a23 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java @@ -462,14 +462,6 @@ public PlacementResult replaceBookie(int ensembleSize, int writeQuorum Map customMetadata, List currentEnsemble, BookieId bookieToReplace, Set excludeBookies) throws BKNotEnoughBookiesException { - return replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, currentEnsemble, - bookieToReplace, excludeBookies, false); - } - - @Override - public PlacementResult replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, - Map customMetadata, List currentEnsemble, BookieId bookieToReplace, - Set excludeBookies, boolean downgradeToSelf) throws BKNotEnoughBookiesException { rwLock.readLock().lock(); try { excludeBookies = addDefaultRackBookiesIfMinNumRacksIsEnforced(excludeBookies); @@ -503,18 +495,14 @@ public PlacementResult replaceBookie(int ensembleSize, int writeQuorum EnsembleForReplacementWithNoConstraints.INSTANCE, !enforceMinNumRacksPerWriteQuorum); } catch (BKNotEnoughBookiesException e) { - if (downgradeToSelf) { - candidate = knownBookies.get(bookieToReplace); - if (candidate == null) { - throw e; - } - if (LOG.isDebugEnabled()) { - LOG.debug("There is no more available bookies to replace, and the waiting to be " - + "replaced bookie: {} is alive. Replace the bookie with itself.", bookieToReplace); - } - } else { + candidate = knownBookies.get(bookieToReplace); + if (candidate == null) { throw e; } + if (LOG.isDebugEnabled()) { + LOG.debug("There is no more available bookies to replace, and the waiting to be " + + "replaced bookie: {} is alive. Replace the bookie with itself.", bookieToReplace); + } } if (LOG.isDebugEnabled()) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java index 02e2b101e6a..391c197c882 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java @@ -490,15 +490,6 @@ public PlacementResult replaceBookie(int ensembleSize, int writeQuorum Map customMetadata, List currentEnsemble, BookieId bookieToReplace, Set excludeBookies) throws BKException.BKNotEnoughBookiesException { - return replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, currentEnsemble, - bookieToReplace, excludeBookies, false); - } - - @Override - public PlacementResult replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, - Map customMetadata, List currentEnsemble, - BookieId bookieToReplace, Set excludeBookies, boolean downgradeToSelf) - throws BKException.BKNotEnoughBookiesException { rwLock.readLock().lock(); try { boolean enforceDurability = enforceDurabilityInReplace && !disableDurabilityFeature.isAvailable(); @@ -556,19 +547,15 @@ public PlacementResult replaceBookie(int ensembleSize, int writeQuorum LOG.debug("Bookie {} is chosen to replace bookie {}.", candidate, bookieNodeToReplace); } } catch (BKException.BKNotEnoughBookiesException e) { - if (downgradeToSelf) { - BookieNode bn = knownBookies.get(bookieToReplace); - if (bn == null) { - throw e; - } - if (LOG.isDebugEnabled()) { - LOG.debug("There is no more available bookies to replace, and the waiting to be " - + "replaced bookie: {} is alive. Replace the bookie with itself.", bookieToReplace); - } - candidateAddr = bn.getAddr(); - } else { + BookieNode bn = knownBookies.get(bookieToReplace); + if (bn == null) { throw e; } + if (LOG.isDebugEnabled()) { + LOG.debug("There is no more available bookies to replace, and the waiting to be " + + "replaced bookie: {} is alive. Replace the bookie with itself.", bookieToReplace); + } + candidateAddr = bn.getAddr(); } List newEnsemble = new ArrayList(currentEnsemble); if (currentEnsemble.isEmpty()) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ZoneawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ZoneawareEnsemblePlacementPolicy.java index d8de2756cd7..0a11c59dae7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ZoneawareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ZoneawareEnsemblePlacementPolicy.java @@ -101,25 +101,7 @@ public PlacementResult replaceBookie(int ensembleSize, int writeQuorum BookieId bookieToReplace, Set excludeBookies) throws BKException.BKNotEnoughBookiesException { return replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, currentEnsemble, - bookieToReplace, excludeBookies, false); - } - - @Override - public PlacementResult replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, - Map customMetadata, List currentEnsemble, - BookieId bookieToReplace, Set excludeBookies, boolean downgradeToSelf) - throws BKException.BKNotEnoughBookiesException { - try { - return super.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, - currentEnsemble, bookieToReplace, excludeBookies, downgradeToSelf); - } catch (BKException.BKNotEnoughBookiesException bnebe) { - if (slave == null) { - throw bnebe; - } else { - return slave.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, - currentEnsemble, bookieToReplace, excludeBookies, downgradeToSelf); - } - } + bookieToReplace, excludeBookies); } @Override diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ZoneawareEnsemblePlacementPolicyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ZoneawareEnsemblePlacementPolicyImpl.java index 0ea8c2a3e9f..28cb9a914a2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ZoneawareEnsemblePlacementPolicyImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ZoneawareEnsemblePlacementPolicyImpl.java @@ -411,19 +411,11 @@ public PlacementResult> newEnsemble(int ensembleSize, int writeQu } } - @Override - public PlacementResult replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, - Map customMetadata, List currentEnsemble, - BookieId bookieToReplace, Set excludeBookies) - throws BKNotEnoughBookiesException { - return replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, currentEnsemble, - bookieToReplace, excludeBookies, false); - } - @Override public PlacementResult replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, Map customMetadata, List currentEnsemble, BookieId bookieToReplace, - Set excludeBookies, boolean downgradeToSelf) throws BKNotEnoughBookiesException { + Set excludeBookies) + throws BKNotEnoughBookiesException { int bookieToReplaceIndex = currentEnsemble.indexOf(bookieToReplace); int desiredNumZonesPerWriteQuorumForThisEnsemble = (writeQuorumSize < desiredNumZonesPerWriteQuorum) ? writeQuorumSize : desiredNumZonesPerWriteQuorum; @@ -435,20 +427,16 @@ public PlacementResult replaceBookie(int ensembleSize, int writeQuorum return selectBookieRandomly(newEnsemble, bookieToReplace, excludeBookies, writeQuorumSize, ackQuorumSize); } catch (BKNotEnoughBookiesException e) { - if (downgradeToSelf) { - BookieNode bookieNode = knownBookies.get(bookieToReplace); - if (bookieNode == null) { - throw e; - } - if (LOG.isDebugEnabled()) { - LOG.debug("There is no more available bookies to replace, and the waiting to be " - + "replaced bookie: {} is alive. Replace the bookie with itself.", bookieToReplace); - } - return PlacementResult.of(bookieNode.getAddr(), - isEnsembleAdheringToPlacementPolicy(newEnsemble, writeQuorumSize, ackQuorumSize)); - } else { + BookieNode bookieNode = knownBookies.get(bookieToReplace); + if (bookieNode == null) { throw e; } + if (LOG.isDebugEnabled()) { + LOG.debug("There is no more available bookies to replace, and the waiting to be " + + "replaced bookie: {} is alive. Replace the bookie with itself.", bookieToReplace); + } + return PlacementResult.of(bookieNode.getAddr(), + isEnsembleAdheringToPlacementPolicy(newEnsemble, writeQuorumSize, ackQuorumSize)); } } Set comprehensiveExclusionBookiesSet = addDefaultFaultDomainBookies(excludeBookies); @@ -459,19 +447,15 @@ public PlacementResult replaceBookie(int ensembleSize, int writeQuorum newEnsemble, bookieToReplaceIndex, desiredNumZonesPerWriteQuorumForThisEnsemble, comprehensiveExclusionBookiesSet); } catch (BKNotEnoughBookiesException e) { - if (downgradeToSelf) { - BookieNode bookieNode = knownBookies.get(bookieToReplace); - if (bookieNode == null) { - throw e; - } - if (LOG.isDebugEnabled()) { - LOG.debug("There is no more available bookies to replace, and the waiting to be " - + "replaced bookie: {} is alive. Replace the bookie with itself.", bookieToReplace); - } - candidateAddr = bookieNode.getAddr(); - } else { - throw e; + BookieNode bookieNode = knownBookies.get(bookieToReplace); + if (bookieNode == null) { + throw e; + } + if (LOG.isDebugEnabled()) { + LOG.debug("There is no more available bookies to replace, and the waiting to be " + + "replaced bookie: {} is alive. Replace the bookie with itself.", bookieToReplace); } + candidateAddr = bookieNode.getAddr(); } return PlacementResult.of(candidateAddr, isEnsembleAdheringToPlacementPolicy(newEnsemble, writeQuorumSize, ackQuorumSize)); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java index a3a38e5a98d..293c0a76dfd 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java @@ -728,21 +728,16 @@ public void testReplaceBookieWithNoMoreBookies() throws Exception { ensemble.add(addr3.toBookieId()); repp.onClusterChanged(addrs, new HashSet()); - try { - repp.replaceBookie(1, 1, 1, null, ensemble, addr2.toBookieId(), new HashSet<>()); - fail("Should throw BKNotEnoughBookiesException when there is not enough bookies"); - } catch (BKNotEnoughBookiesException ignore) { - } EnsemblePlacementPolicy.PlacementResult replaceBookieResponse = repp.replaceBookie(1, 1, 1, - null, ensemble, addr2.toBookieId(), new HashSet<>(), true); + null, ensemble, addr2.toBookieId(), new HashSet<>()); BookieId replacedBookie = replaceBookieResponse.getResult(); assertEquals(addr2.toBookieId(), replacedBookie); addrs.remove(addr2.toBookieId()); repp.onClusterChanged(addrs, new HashSet()); try { - repp.replaceBookie(1, 1, 1, null, ensemble, addr2.toBookieId(), new HashSet<>(), true); + repp.replaceBookie(1, 1, 1, null, ensemble, addr2.toBookieId(), new HashSet<>()); fail("Should throw BKNotEnoughBookiesException when there is not enough bookies"); } catch (BKNotEnoughBookiesException ignore) { } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java index a0fe575787b..212282daa0e 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java @@ -440,21 +440,16 @@ public void testReplaceBookieWithNoMoreBookie() throws Exception { ensemble.add(addr3.toBookieId()); repp.onClusterChanged(addrs, new HashSet()); - try { - repp.replaceBookie(1, 1, 1, null, ensemble, addr2.toBookieId(), new HashSet<>()); - fail("Should throw BKNotEnoughBookiesException when there is not enough bookies"); - } catch (BKNotEnoughBookiesException ignore) { - } EnsemblePlacementPolicy.PlacementResult replaceBookieResponse = repp.replaceBookie(1, 1, 1, - null, ensemble, addr2.toBookieId(), new HashSet<>(), true); + null, ensemble, addr2.toBookieId(), new HashSet<>()); BookieId replacedBookie = replaceBookieResponse.getResult(); assertEquals(addr2.toBookieId(), replacedBookie); addrs.remove(addr2.toBookieId()); repp.onClusterChanged(addrs, new HashSet()); try { - repp.replaceBookie(1, 1, 1, null, ensemble, addr2.toBookieId(), new HashSet<>(), true); + repp.replaceBookie(1, 1, 1, null, ensemble, addr2.toBookieId(), new HashSet<>()); fail("Should throw BKNotEnoughBookiesException when there is not enough bookies"); } catch (BKNotEnoughBookiesException ignore) { } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestZoneawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestZoneawareEnsemblePlacementPolicy.java index 3ce95cdc3fb..5f4e134db33 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestZoneawareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestZoneawareEnsemblePlacementPolicy.java @@ -897,21 +897,15 @@ public void testReplaceBookieWithNoMoreBookie() throws Exception { ensemble.add(addr7.toBookieId()); zepp.onClusterChanged(addrs, new HashSet()); - try { - zepp.replaceBookie(1, 1, 1, null, ensemble, addr6.toBookieId(), new HashSet<>()); - fail("Should throw BKNotEnoughBookiesException when there is not enough bookies"); - } catch (BKException.BKNotEnoughBookiesException ignore) { - } - EnsemblePlacementPolicy.PlacementResult replaceBookieResponse = zepp.replaceBookie(1, 1, 1, - null, ensemble, addr6.toBookieId(), new HashSet<>(), true); + null, ensemble, addr6.toBookieId(), new HashSet<>()); BookieId replacedBookie = replaceBookieResponse.getResult(); assertEquals(addr6.toBookieId(), replacedBookie); addrs.remove(addr6.toBookieId()); zepp.onClusterChanged(addrs, new HashSet()); try { - zepp.replaceBookie(1, 1, 1, null, ensemble, addr6.toBookieId(), new HashSet<>(), true); + zepp.replaceBookie(1, 1, 1, null, ensemble, addr6.toBookieId(), new HashSet<>()); fail("Should throw BKNotEnoughBookiesException when there is not enough bookies"); } catch (BKException.BKNotEnoughBookiesException ignore) { } From 5792c6502c4d77958994ff3cf3cede4dd0dd2401 Mon Sep 17 00:00:00 2001 From: horizonzy Date: Thu, 6 Jul 2023 00:59:59 +0800 Subject: [PATCH 07/13] Fix bugs. --- .../bookie/LocalBookieEnsemblePlacementPolicy.java | 3 +-- .../client/RackawareEnsemblePlacementPolicy.java | 2 +- .../client/ZoneawareEnsemblePlacementPolicy.java | 13 +++++++++++-- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java index 6c349991fc8..7c949cb3d4b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java @@ -86,8 +86,7 @@ public PlacementResult replaceBookie(int ensembleSize, int writeQuorum java.util.Map customMetadata, List currentEnsemble, BookieId bookieToReplace, Set excludeBookies) throws BKNotEnoughBookiesException { - return this.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, currentEnsemble, - bookieToReplace, excludeBookies); + throw new BKNotEnoughBookiesException(); } @Override diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java index 0101db6092d..72858f188f7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java @@ -115,7 +115,7 @@ public PlacementResult replaceBookie(int ensembleSize, int writeQuorum Map customMetadata, List currentEnsemble, BookieId bookieToReplace, Set excludeBookies) throws BKException.BKNotEnoughBookiesException { - try { + try { return super.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, currentEnsemble, bookieToReplace, excludeBookies); } catch (BKException.BKNotEnoughBookiesException bnebe) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ZoneawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ZoneawareEnsemblePlacementPolicy.java index 0a11c59dae7..cf36c7d83fe 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ZoneawareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ZoneawareEnsemblePlacementPolicy.java @@ -100,8 +100,17 @@ public PlacementResult replaceBookie(int ensembleSize, int writeQuorum Map customMetadata, List currentEnsemble, BookieId bookieToReplace, Set excludeBookies) throws BKException.BKNotEnoughBookiesException { - return replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, currentEnsemble, - bookieToReplace, excludeBookies); + try { + return super.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, + currentEnsemble, bookieToReplace, excludeBookies); + } catch (BKException.BKNotEnoughBookiesException bnebe) { + if (slave == null) { + throw bnebe; + } else { + return slave.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, + currentEnsemble, bookieToReplace, excludeBookies); + } + } } @Override From dafdf6702b9d1cfa22b830f32f38bc49b7619abd Mon Sep 17 00:00:00 2001 From: horizonzy Date: Sun, 9 Jul 2023 08:43:18 +0800 Subject: [PATCH 08/13] Fix ci. --- .../client/TestRegionAwareEnsemblePlacementPolicy.java | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java index 212282daa0e..18b11cdd06d 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java @@ -559,12 +559,7 @@ public void testReplaceBookieWithNotEnoughBookies() throws Exception { excludedAddrs.add(addr1.toBookieId()); excludedAddrs.add(addr3.toBookieId()); excludedAddrs.add(addr4.toBookieId()); - try { - repp.replaceBookie(1, 1, 1, null, new ArrayList(), addr2.toBookieId(), excludedAddrs); - fail("Should throw BKNotEnoughBookiesException when there is not enough bookies"); - } catch (BKNotEnoughBookiesException bnebe) { - // should throw not enou - } + repp.replaceBookie(1, 1, 1, null, new ArrayList(), addr2.toBookieId(), excludedAddrs); } @Test From 65e114d46f19fc4393e48342ce76be62f90f6701 Mon Sep 17 00:00:00 2001 From: horizonzy Date: Sat, 15 Jul 2023 11:00:13 +0800 Subject: [PATCH 09/13] Make downgrade to self only works in ReplicationWorker. --- .../bookkeeper/client/BookKeeperAdmin.java | 48 +++++++++++-------- .../DefaultEnsemblePlacementPolicy.java | 24 +++------- .../RackawareEnsemblePlacementPolicyImpl.java | 29 ++++------- .../RegionAwareEnsemblePlacementPolicy.java | 35 +++++--------- .../ZoneawareEnsemblePlacementPolicyImpl.java | 40 +++------------- 5 files changed, 62 insertions(+), 114 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java index d8c17126455..3af0a306694 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java @@ -1061,14 +1061,14 @@ private Map getReplacementBookies( } } return getReplacementBookiesByIndexes( - lh, ensemble, bookieIndexesToRereplicate, Optional.of(bookiesToRereplicate)); + lh, ensemble, bookieIndexesToRereplicate, Optional.of(bookiesToRereplicate), false); } private Map getReplacementBookiesByIndexes( LedgerHandle lh, List ensemble, Set bookieIndexesToRereplicate, - Optional> excludedBookies) + Optional> excludedBookies, boolean downUpgradeToSelf) throws BKException.BKNotEnoughBookiesException { // target bookies to replicate Map targetBookieAddresses = @@ -1099,22 +1099,32 @@ private Map getReplacementBookiesByIndexes( // allocate bookies for (Integer bookieIndex : orderedBookieIndexesToRereplicate) { BookieId oldBookie = newEnsemble.get(bookieIndex); - EnsemblePlacementPolicy.PlacementResult replaceBookieResponse = - bkc.getPlacementPolicy().replaceBookie( - lh.getLedgerMetadata().getEnsembleSize(), - lh.getLedgerMetadata().getWriteQuorumSize(), - lh.getLedgerMetadata().getAckQuorumSize(), - lh.getLedgerMetadata().getCustomMetadata(), - newEnsemble, - oldBookie, - bookiesToExclude); - BookieId newBookie = replaceBookieResponse.getResult(); - PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getAdheringToPolicy(); - if (isEnsembleAdheringToPlacementPolicy == PlacementPolicyAdherence.FAIL && LOG.isDebugEnabled()) { - LOG.debug( - "replaceBookie for bookie: {} in ensemble: {} " - + "is not adhering to placement policy and chose {}", - oldBookie, newEnsemble, newBookie); + BookieId newBookie; + try { + EnsemblePlacementPolicy.PlacementResult replaceBookieResponse = + bkc.getPlacementPolicy().replaceBookie( + lh.getLedgerMetadata().getEnsembleSize(), + lh.getLedgerMetadata().getWriteQuorumSize(), + lh.getLedgerMetadata().getAckQuorumSize(), + lh.getLedgerMetadata().getCustomMetadata(), + newEnsemble, + oldBookie, + bookiesToExclude); + newBookie = replaceBookieResponse.getResult(); + PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getAdheringToPolicy(); + if (isEnsembleAdheringToPlacementPolicy == PlacementPolicyAdherence.FAIL && LOG.isDebugEnabled()) { + LOG.debug( + "replaceBookie for bookie: {} in ensemble: {} " + + "is not adhering to placement policy and chose {}", + oldBookie, newEnsemble, newBookie); + } + } catch (BKException.BKNotEnoughBookiesException e) { + if (downUpgradeToSelf && ((TopologyAwareEnsemblePlacementPolicy) bkc.getPlacementPolicy()).isAlive( + oldBookie)) { + newBookie = oldBookie; + } else { + throw e; + } } targetBookieAddresses.put(bookieIndex, newBookie); bookiesToExclude.add(newBookie); @@ -1148,7 +1158,7 @@ public void replicateLedgerFragment(LedgerHandle lh, final LedgerFragment ledger if (LedgerFragment.ReplicateType.DATA_LOSS == ledgerFragment.getReplicateType()) { Optional> excludedBookies = Optional.empty(); targetBookieAddresses = getReplacementBookiesByIndexes(lh, ledgerFragment.getEnsemble(), - ledgerFragment.getBookiesIndexes(), excludedBookies); + ledgerFragment.getBookiesIndexes(), excludedBookies, true); } else if (LedgerFragment.ReplicateType.DATA_NOT_ADHERING_PLACEMENT == ledgerFragment.getReplicateType()) { targetBookieAddresses = replaceNotAdheringPlacementPolicyBookie(ledgerFragment.getEnsemble(), lh.getLedgerMetadata().getWriteQuorumSize(), lh.getLedgerMetadata().getAckQuorumSize()); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java index 06b062eccd1..94cd30344f1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java @@ -120,25 +120,15 @@ public PlacementResult> newEnsemble(int ensembleSize, int quorumS @Override public PlacementResult replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, - Map customMetadata, List currentEnsemble, BookieId bookieToReplace, - Set excludeBookies) throws BKNotEnoughBookiesException { + Map customMetadata, List currentEnsemble, + BookieId bookieToReplace, Set excludeBookies) + throws BKNotEnoughBookiesException { excludeBookies.addAll(currentEnsemble); + List addresses = newEnsemble(1, 1, 1, customMetadata, excludeBookies).getResult(); + + BookieId candidateAddr = addresses.get(0); List newEnsemble = new ArrayList(currentEnsemble); - BookieId candidateAddr; - try { - List addresses = newEnsemble(1, 1, 1, customMetadata, excludeBookies).getResult(); - candidateAddr = addresses.get(0); - newEnsemble.set(currentEnsemble.indexOf(bookieToReplace), candidateAddr); - } catch (BKNotEnoughBookiesException e) { - if (!knownBookies.contains(bookieToReplace)) { - throw e; - } - candidateAddr = bookieToReplace; - if (LOG.isDebugEnabled()) { - LOG.debug("There is no more available bookies to replace, and the waiting to be replaced: " - + "{} bookie is alive. Replace the bookie with itself.", bookieToReplace); - } - } + newEnsemble.set(currentEnsemble.indexOf(bookieToReplace), candidateAddr); return PlacementResult.of(candidateAddr, isEnsembleAdheringToPlacementPolicy(newEnsemble, writeQuorumSize, ackQuorumSize)); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java index d105e971a23..6ec9e5b1589 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java @@ -482,29 +482,16 @@ public PlacementResult replaceBookie(int ensembleSize, int writeQuorum if (LOG.isDebugEnabled()) { LOG.debug("Try to choose a new bookie to replace {} from ensemble {}, excluding {}.", - bookieToReplace, ensembleNodes, excludeNodes); + bookieToReplace, ensembleNodes, excludeNodes); } // pick a candidate from same rack to replace - BookieNode candidate; - try { - candidate = selectFromNetworkLocation( - bn.getNetworkLocation(), - networkLocationsToBeExcluded, - excludeNodes, - TruePredicate.INSTANCE, - EnsembleForReplacementWithNoConstraints.INSTANCE, - !enforceMinNumRacksPerWriteQuorum); - } catch (BKNotEnoughBookiesException e) { - candidate = knownBookies.get(bookieToReplace); - if (candidate == null) { - throw e; - } - if (LOG.isDebugEnabled()) { - LOG.debug("There is no more available bookies to replace, and the waiting to be " - + "replaced bookie: {} is alive. Replace the bookie with itself.", bookieToReplace); - } - } - + BookieNode candidate = selectFromNetworkLocation( + bn.getNetworkLocation(), + networkLocationsToBeExcluded, + excludeNodes, + TruePredicate.INSTANCE, + EnsembleForReplacementWithNoConstraints.INSTANCE, + !enforceMinNumRacksPerWriteQuorum); if (LOG.isDebugEnabled()) { LOG.debug("Bookie {} is chosen to replace bookie {}.", candidate, bn); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java index 391c197c882..43969b8fde3 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java @@ -498,11 +498,11 @@ public PlacementResult replaceBookie(int ensembleSize, int writeQuorum excludeBookies); Set excludeNodes = convertBookiesToNodes(comprehensiveExclusionBookiesSet); RRTopologyAwareCoverageEnsemble ensemble = new RRTopologyAwareCoverageEnsemble(ensembleSize, - writeQuorumSize, - ackQuorumSize, - REGIONID_DISTANCE_FROM_LEAVES, - effectiveMinRegionsForDurability > 0 ? new HashSet(perRegionPlacement.keySet()) : null, - effectiveMinRegionsForDurability, minNumRacksPerWriteQuorum); + writeQuorumSize, + ackQuorumSize, + REGIONID_DISTANCE_FROM_LEAVES, + effectiveMinRegionsForDurability > 0 ? new HashSet(perRegionPlacement.keySet()) : null, + effectiveMinRegionsForDurability, minNumRacksPerWriteQuorum); BookieNode bookieNodeToReplace = knownBookies.get(bookieToReplace); if (null == bookieNodeToReplace) { @@ -535,28 +535,15 @@ public PlacementResult replaceBookie(int ensembleSize, int writeQuorum if (LOG.isDebugEnabled()) { LOG.debug("Try to choose a new bookie to replace {}, excluding {}.", bookieToReplace, - excludeNodes); + excludeNodes); } // pick a candidate from same rack to replace - BookieId candidateAddr; - try { - BookieNode candidate = replaceFromRack(bookieNodeToReplace, excludeNodes, - ensemble, ensemble, enforceDurability); - candidateAddr = candidate.getAddr(); - if (LOG.isDebugEnabled()) { - LOG.debug("Bookie {} is chosen to replace bookie {}.", candidate, bookieNodeToReplace); - } - } catch (BKException.BKNotEnoughBookiesException e) { - BookieNode bn = knownBookies.get(bookieToReplace); - if (bn == null) { - throw e; - } - if (LOG.isDebugEnabled()) { - LOG.debug("There is no more available bookies to replace, and the waiting to be " - + "replaced bookie: {} is alive. Replace the bookie with itself.", bookieToReplace); - } - candidateAddr = bn.getAddr(); + BookieNode candidate = replaceFromRack(bookieNodeToReplace, excludeNodes, + ensemble, ensemble, enforceDurability); + if (LOG.isDebugEnabled()) { + LOG.debug("Bookie {} is chosen to replace bookie {}.", candidate, bookieNodeToReplace); } + BookieId candidateAddr = candidate.getAddr(); List newEnsemble = new ArrayList(currentEnsemble); if (currentEnsemble.isEmpty()) { /* diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ZoneawareEnsemblePlacementPolicyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ZoneawareEnsemblePlacementPolicyImpl.java index 28cb9a914a2..1ce04c4be31 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ZoneawareEnsemblePlacementPolicyImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ZoneawareEnsemblePlacementPolicyImpl.java @@ -413,8 +413,8 @@ public PlacementResult> newEnsemble(int ensembleSize, int writeQu @Override public PlacementResult replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, - Map customMetadata, List currentEnsemble, BookieId bookieToReplace, - Set excludeBookies) + Map customMetadata, List currentEnsemble, + BookieId bookieToReplace, Set excludeBookies) throws BKNotEnoughBookiesException { int bookieToReplaceIndex = currentEnsemble.indexOf(bookieToReplace); int desiredNumZonesPerWriteQuorumForThisEnsemble = (writeQuorumSize < desiredNumZonesPerWriteQuorum) @@ -423,40 +423,14 @@ public PlacementResult replaceBookie(int ensembleSize, int writeQuorum rwLock.readLock().lock(); try { if (!enforceStrictZoneawarePlacement) { - try { - return selectBookieRandomly(newEnsemble, bookieToReplace, excludeBookies, writeQuorumSize, - ackQuorumSize); - } catch (BKNotEnoughBookiesException e) { - BookieNode bookieNode = knownBookies.get(bookieToReplace); - if (bookieNode == null) { - throw e; - } - if (LOG.isDebugEnabled()) { - LOG.debug("There is no more available bookies to replace, and the waiting to be " - + "replaced bookie: {} is alive. Replace the bookie with itself.", bookieToReplace); - } - return PlacementResult.of(bookieNode.getAddr(), - isEnsembleAdheringToPlacementPolicy(newEnsemble, writeQuorumSize, ackQuorumSize)); - } + return selectBookieRandomly(newEnsemble, bookieToReplace, excludeBookies, writeQuorumSize, + ackQuorumSize); } Set comprehensiveExclusionBookiesSet = addDefaultFaultDomainBookies(excludeBookies); comprehensiveExclusionBookiesSet.addAll(currentEnsemble); - BookieId candidateAddr; - try { - candidateAddr = setBookieInTheEnsemble(ensembleSize, writeQuorumSize, currentEnsemble, - newEnsemble, bookieToReplaceIndex, desiredNumZonesPerWriteQuorumForThisEnsemble, - comprehensiveExclusionBookiesSet); - } catch (BKNotEnoughBookiesException e) { - BookieNode bookieNode = knownBookies.get(bookieToReplace); - if (bookieNode == null) { - throw e; - } - if (LOG.isDebugEnabled()) { - LOG.debug("There is no more available bookies to replace, and the waiting to be " - + "replaced bookie: {} is alive. Replace the bookie with itself.", bookieToReplace); - } - candidateAddr = bookieNode.getAddr(); - } + BookieId candidateAddr = setBookieInTheEnsemble(ensembleSize, writeQuorumSize, currentEnsemble, + newEnsemble, bookieToReplaceIndex, desiredNumZonesPerWriteQuorumForThisEnsemble, + comprehensiveExclusionBookiesSet); return PlacementResult.of(candidateAddr, isEnsembleAdheringToPlacementPolicy(newEnsemble, writeQuorumSize, ackQuorumSize)); } finally { From adb2f240f59f58680ac3d0daaab01deb027ee7ad Mon Sep 17 00:00:00 2001 From: horizonzy Date: Sat, 15 Jul 2023 12:57:12 +0800 Subject: [PATCH 10/13] Tune ci. --- .../bookkeeper/client/BookKeeperAdmin.java | 8 +- .../TestRackawareEnsemblePlacementPolicy.java | 108 ++++++------------ ...estRegionAwareEnsemblePlacementPolicy.java | 40 ++++++- .../TestZoneawareEnsemblePlacementPolicy.java | 41 ++++++- 4 files changed, 110 insertions(+), 87 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java index 3af0a306694..0cbe0c215ad 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java @@ -24,6 +24,7 @@ import static org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithMetadataBookieDriver; import static org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithRegistrationManager; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -1064,11 +1065,12 @@ private Map getReplacementBookies( lh, ensemble, bookieIndexesToRereplicate, Optional.of(bookiesToRereplicate), false); } - private Map getReplacementBookiesByIndexes( + @VisibleForTesting + Map getReplacementBookiesByIndexes( LedgerHandle lh, List ensemble, Set bookieIndexesToRereplicate, - Optional> excludedBookies, boolean downUpgradeToSelf) + Optional> excludedBookies, boolean downgradeToSelf) throws BKException.BKNotEnoughBookiesException { // target bookies to replicate Map targetBookieAddresses = @@ -1119,7 +1121,7 @@ private Map getReplacementBookiesByIndexes( oldBookie, newEnsemble, newBookie); } } catch (BKException.BKNotEnoughBookiesException e) { - if (downUpgradeToSelf && ((TopologyAwareEnsemblePlacementPolicy) bkc.getPlacementPolicy()).isAlive( + if (downgradeToSelf && ((TopologyAwareEnsemblePlacementPolicy) bkc.getPlacementPolicy()).isAlive( oldBookie)) { newBookie = oldBookie; } else { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java index 293c0a76dfd..297dd7aea5f 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java @@ -24,10 +24,14 @@ import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.netty.util.HashedWheelTimer; + +import java.lang.reflect.Field; import java.net.InetAddress; import java.util.ArrayList; import java.util.Arrays; @@ -48,6 +52,7 @@ import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy.Ensemble; import org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.EnsembleForReplacementWithNoConstraints; import org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.TruePredicate; +import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.common.util.ReflectionUtils; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.net.AbstractDNSToSwitchMapping; @@ -638,75 +643,6 @@ public void testIsEnsembleAdheringToPlacementPolicy() throws Exception { repp.isEnsembleAdheringToPlacementPolicy(ensemble, 3, 3)); } - @Test - public void testReplaceBookieWithNewEnsemble() throws Exception { - BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181); - BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181); - BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181); - BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181); - BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.5", 3181); - // update dns mapping - StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/default-region/r1"); - StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/default-region/r1"); - StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/default-region/r1"); - StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/default-region/r2"); - StaticDNSResolver.addNodeToRack(addr5.getHostName(), "/default-region/r3"); - - ClientConfiguration conf = (ClientConfiguration) this.conf.clone(); - conf.setMinNumRacksPerWriteQuorum(3); - - repp = new RackawareEnsemblePlacementPolicy(); - repp.initialize(conf, Optional. empty(), timer, DISABLE_ALL, - NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); - repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK); - - // Update cluster - Set addrs = new HashSet(); - addrs.add(addr1.toBookieId()); - addrs.add(addr2.toBookieId()); - addrs.add(addr3.toBookieId()); - addrs.add(addr4.toBookieId()); - addrs.add(addr5.toBookieId()); - repp.onClusterChanged(addrs, new HashSet<>()); - - Set bookieIndexesToRereplicate = new HashSet<>(); - bookieIndexesToRereplicate.add(1); - bookieIndexesToRereplicate.add(2); - - List ensemble = new ArrayList<>(); - ensemble.add(addr1.toBookieId()); - ensemble.add(addr2.toBookieId()); - ensemble.add(addr3.toBookieId()); - - Set bookiesToExclude = Sets.newHashSet(); - - for (Integer bookieIndex : bookieIndexesToRereplicate) { - BookieId bookie = ensemble.get(bookieIndex); - bookiesToExclude.add(bookie); - } - - List newEnsemble = new ArrayList<>(ensemble); - - int i = 0; - for (Integer bookieIndex : bookieIndexesToRereplicate) { - BookieId oldBookie = newEnsemble.get(bookieIndex); - EnsemblePlacementPolicy.PlacementResult replaceBookieResponse = repp.replaceBookie(3, 3, 2, - Collections.emptyMap(), newEnsemble, oldBookie, bookiesToExclude); - BookieId newBookie = replaceBookieResponse.getResult(); - PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getAdheringToPolicy(); - if (i == 0) { - Assert.assertEquals(PlacementPolicyAdherence.FAIL, isEnsembleAdheringToPlacementPolicy); - } - if (i == 1) { - Assert.assertEquals(PlacementPolicyAdherence.MEETS_STRICT, isEnsembleAdheringToPlacementPolicy); - } - bookiesToExclude.add(newBookie); - //We should update ensemble after replace. - newEnsemble.set(bookieIndex, newBookie); - i++; - } - } - @Test public void testReplaceBookieWithNoMoreBookies() throws Exception { BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181); @@ -729,15 +665,39 @@ public void testReplaceBookieWithNoMoreBookies() throws Exception { repp.onClusterChanged(addrs, new HashSet()); - EnsemblePlacementPolicy.PlacementResult replaceBookieResponse = repp.replaceBookie(1, 1, 1, - null, ensemble, addr2.toBookieId(), new HashSet<>()); - BookieId replacedBookie = replaceBookieResponse.getResult(); - assertEquals(addr2.toBookieId(), replacedBookie); + BookKeeper bookKeeper = new BookKeeper(); + BookKeeperAdmin admin = new BookKeeperAdmin(bookKeeper, NullStatsLogger.INSTANCE, new ClientConfiguration(conf)); + Field field = BookKeeper.class.getDeclaredField("placementPolicy"); + field.setAccessible(true); + field.set(bookKeeper, repp); + LedgerHandle mockHandle = mock(LedgerHandle.class); + + Map> ensembles = new HashMap<>(); + ensembles.put(0L, ensemble); + LedgerMetadataImpl ledgerMetadata = new LedgerMetadataImpl(0, 0, 3, 3, 2, LedgerMetadata.State.CLOSED, + Optional.of(1L), Optional.of(10L), ensembles, Optional.empty(), Optional.empty(), 0L, false, 0L, + Collections.emptyMap()); + when(mockHandle.getLedgerMetadata()).thenReturn(ledgerMetadata); + //Not downgradeToSelf, can't find replaced bookie. + try { + admin.getReplacementBookiesByIndexes(mockHandle, ensemble, Sets.newHashSet(1), Optional.empty(), false); + fail("Should throw BKNotEnoughBookiesException when there is not enough bookies"); + } catch (BKNotEnoughBookiesException ignore) { + } + + //Use downgradeToSelf, find addr2 itself. + Map replaceResult = admin.getReplacementBookiesByIndexes(mockHandle, ensemble, + Sets.newHashSet(1), Optional.empty(), true); + assertFalse(replaceResult.isEmpty()); + assertEquals(1, replaceResult.size()); + assertEquals(addr2.toBookieId(), replaceResult.get(1)); + //Make addr2 shutdown, then can't find replaced bookie. addrs.remove(addr2.toBookieId()); repp.onClusterChanged(addrs, new HashSet()); + try { - repp.replaceBookie(1, 1, 1, null, ensemble, addr2.toBookieId(), new HashSet<>()); + admin.getReplacementBookiesByIndexes(mockHandle, ensemble, Sets.newHashSet(1), Optional.empty(), false); fail("Should throw BKNotEnoughBookiesException when there is not enough bookies"); } catch (BKNotEnoughBookiesException ignore) { } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java index 18b11cdd06d..d9b9f41a13e 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java @@ -27,12 +27,17 @@ import static org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy.REPP_REGIONS_TO_WRITE; import static org.apache.bookkeeper.client.RoundRobinDistributionSchedule.writeSetFromValues; import static org.apache.bookkeeper.feature.SettableFeatureProvider.DISABLE_ALL; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.netty.util.HashedWheelTimer; + +import java.lang.reflect.Field; import java.net.InetAddress; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -42,6 +47,7 @@ import java.util.concurrent.TimeUnit; import junit.framework.TestCase; import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException; +import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.feature.FeatureProvider; import org.apache.bookkeeper.feature.SettableFeature; @@ -441,15 +447,39 @@ public void testReplaceBookieWithNoMoreBookie() throws Exception { repp.onClusterChanged(addrs, new HashSet()); - EnsemblePlacementPolicy.PlacementResult replaceBookieResponse = repp.replaceBookie(1, 1, 1, - null, ensemble, addr2.toBookieId(), new HashSet<>()); - BookieId replacedBookie = replaceBookieResponse.getResult(); - assertEquals(addr2.toBookieId(), replacedBookie); + BookKeeper bookKeeper = new BookKeeper(); + BookKeeperAdmin admin = new BookKeeperAdmin(bookKeeper, NullStatsLogger.INSTANCE, new ClientConfiguration(conf)); + Field field = BookKeeper.class.getDeclaredField("placementPolicy"); + field.setAccessible(true); + field.set(bookKeeper, repp); + LedgerHandle mockHandle = mock(LedgerHandle.class); + Map> ensembles = new HashMap<>(); + ensembles.put(0L, ensemble); + LedgerMetadataImpl ledgerMetadata = new LedgerMetadataImpl(0, 0, 3, 3, 2, LedgerMetadata.State.CLOSED, + Optional.of(1L), Optional.of(10L), ensembles, Optional.empty(), Optional.empty(), 0L, false, 0L, + Collections.emptyMap()); + when(mockHandle.getLedgerMetadata()).thenReturn(ledgerMetadata); + + //Not downgradeToSelf, can't find replaced bookie. + try { + admin.getReplacementBookiesByIndexes(mockHandle, ensemble, Sets.newHashSet(1), Optional.empty(), false); + fail("Should throw BKNotEnoughBookiesException when there is not enough bookies"); + } catch (BKNotEnoughBookiesException ignore) { + } + + //Use downgradeToSelf, find addr2 itself. + Map replaceResult = admin.getReplacementBookiesByIndexes(mockHandle, ensemble, + Sets.newHashSet(1), Optional.empty(), true); + assertFalse(replaceResult.isEmpty()); + assertEquals(1, replaceResult.size()); + assertEquals(addr2.toBookieId(), replaceResult.get(1)); + //Make addr2 shutdown, then can't find replaced bookie. addrs.remove(addr2.toBookieId()); repp.onClusterChanged(addrs, new HashSet()); + try { - repp.replaceBookie(1, 1, 1, null, ensemble, addr2.toBookieId(), new HashSet<>()); + admin.getReplacementBookiesByIndexes(mockHandle, ensemble, Sets.newHashSet(1), Optional.empty(), false); fail("Should throw BKNotEnoughBookiesException when there is not enough bookies"); } catch (BKNotEnoughBookiesException ignore) { } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestZoneawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestZoneawareEnsemblePlacementPolicy.java index 5f4e134db33..29f3f8d37a5 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestZoneawareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestZoneawareEnsemblePlacementPolicy.java @@ -21,9 +21,14 @@ import static org.apache.bookkeeper.client.RoundRobinDistributionSchedule.writeSetFromValues; import static org.apache.bookkeeper.feature.SettableFeatureProvider.DISABLE_ALL; import static org.junit.Assert.assertNotEquals; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.netty.util.HashedWheelTimer; + +import java.lang.reflect.Field; import java.net.InetAddress; import java.util.ArrayList; import java.util.Collections; @@ -39,6 +44,7 @@ import org.apache.bookkeeper.client.EnsemblePlacementPolicy.PlacementPolicyAdherence; import org.apache.bookkeeper.client.EnsemblePlacementPolicy.PlacementResult; import org.apache.bookkeeper.client.ZoneawareEnsemblePlacementPolicyImpl.ZoneAwareNodeLocation; +import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.net.BookieId; import org.apache.bookkeeper.net.BookieSocketAddress; @@ -897,15 +903,40 @@ public void testReplaceBookieWithNoMoreBookie() throws Exception { ensemble.add(addr7.toBookieId()); zepp.onClusterChanged(addrs, new HashSet()); - EnsemblePlacementPolicy.PlacementResult replaceBookieResponse = zepp.replaceBookie(1, 1, 1, - null, ensemble, addr6.toBookieId(), new HashSet<>()); - BookieId replacedBookie = replaceBookieResponse.getResult(); - assertEquals(addr6.toBookieId(), replacedBookie); + + BookKeeper bookKeeper = new BookKeeper(); + BookKeeperAdmin admin = new BookKeeperAdmin(bookKeeper, NullStatsLogger.INSTANCE, new ClientConfiguration(conf)); + Field field = BookKeeper.class.getDeclaredField("placementPolicy"); + field.setAccessible(true); + field.set(bookKeeper, zepp); + LedgerHandle mockHandle = mock(LedgerHandle.class); + + Map> ensembles = new HashMap<>(); + ensembles.put(0L, ensemble); + LedgerMetadataImpl ledgerMetadata = new LedgerMetadataImpl(0, 0, 3, 3, 2, LedgerMetadata.State.CLOSED, + Optional.of(1L), Optional.of(10L), ensembles, Optional.empty(), Optional.empty(), 0L, false, 0L, + Collections.emptyMap()); + when(mockHandle.getLedgerMetadata()).thenReturn(ledgerMetadata); + + //Not downgradeToSelf, can't find replaced bookie. + try { + admin.getReplacementBookiesByIndexes(mockHandle, ensemble, Sets.newHashSet(1), Optional.empty(), false); + fail("Should throw BKNotEnoughBookiesException when there is not enough bookies"); + } catch (BKException.BKNotEnoughBookiesException ignore) { + } + + //Use downgradeToSelf, find addr2 itself. + Map replaceResult = admin.getReplacementBookiesByIndexes(mockHandle, ensemble, + Sets.newHashSet(1), Optional.empty(), true); + assertFalse(replaceResult.isEmpty()); + assertEquals(1, replaceResult.size()); + assertEquals(addr6.toBookieId(), replaceResult.get(1)); addrs.remove(addr6.toBookieId()); zepp.onClusterChanged(addrs, new HashSet()); + //Not downgradeToSelf, can't find replaced bookie. try { - zepp.replaceBookie(1, 1, 1, null, ensemble, addr6.toBookieId(), new HashSet<>()); + admin.getReplacementBookiesByIndexes(mockHandle, ensemble, Sets.newHashSet(1), Optional.empty(), false); fail("Should throw BKNotEnoughBookiesException when there is not enough bookies"); } catch (BKException.BKNotEnoughBookiesException ignore) { } From 8144fa83beaa5c3e9b09b22a2c111242766ce448 Mon Sep 17 00:00:00 2001 From: horizonzy Date: Sat, 15 Jul 2023 13:02:55 +0800 Subject: [PATCH 11/13] Fix checkstyle. --- .../java/org/apache/bookkeeper/client/BookKeeperAdmin.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java index 0cbe0c215ad..a3a217528d9 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java @@ -1113,7 +1113,8 @@ Map getReplacementBookiesByIndexes( oldBookie, bookiesToExclude); newBookie = replaceBookieResponse.getResult(); - PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getAdheringToPolicy(); + PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = + replaceBookieResponse.getAdheringToPolicy(); if (isEnsembleAdheringToPlacementPolicy == PlacementPolicyAdherence.FAIL && LOG.isDebugEnabled()) { LOG.debug( "replaceBookie for bookie: {} in ensemble: {} " From d56d1872f01214130cfc1096e20a9c7cdb843c1b Mon Sep 17 00:00:00 2001 From: horizonzy Date: Sat, 15 Jul 2023 13:29:07 +0800 Subject: [PATCH 12/13] Fix checkstyle. --- .../client/TestRackawareEnsemblePlacementPolicy.java | 5 ++--- .../client/TestRegionAwareEnsemblePlacementPolicy.java | 4 ++-- .../client/TestZoneawareEnsemblePlacementPolicy.java | 6 +++--- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java index 297dd7aea5f..37dea7be250 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java @@ -30,7 +30,6 @@ import com.google.common.collect.Sets; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.netty.util.HashedWheelTimer; - import java.lang.reflect.Field; import java.net.InetAddress; import java.util.ArrayList; @@ -74,7 +73,6 @@ import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.TypeSafeMatcher; -import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -666,7 +664,8 @@ public void testReplaceBookieWithNoMoreBookies() throws Exception { repp.onClusterChanged(addrs, new HashSet()); BookKeeper bookKeeper = new BookKeeper(); - BookKeeperAdmin admin = new BookKeeperAdmin(bookKeeper, NullStatsLogger.INSTANCE, new ClientConfiguration(conf)); + BookKeeperAdmin admin = new BookKeeperAdmin(bookKeeper, NullStatsLogger.INSTANCE, + new ClientConfiguration(conf)); Field field = BookKeeper.class.getDeclaredField("placementPolicy"); field.setAccessible(true); field.set(bookKeeper, repp); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java index d9b9f41a13e..bc33d3cb5f7 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java @@ -33,7 +33,6 @@ import com.google.common.collect.Sets; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.netty.util.HashedWheelTimer; - import java.lang.reflect.Field; import java.net.InetAddress; import java.util.ArrayList; @@ -448,7 +447,8 @@ public void testReplaceBookieWithNoMoreBookie() throws Exception { repp.onClusterChanged(addrs, new HashSet()); BookKeeper bookKeeper = new BookKeeper(); - BookKeeperAdmin admin = new BookKeeperAdmin(bookKeeper, NullStatsLogger.INSTANCE, new ClientConfiguration(conf)); + BookKeeperAdmin admin = new BookKeeperAdmin(bookKeeper, NullStatsLogger.INSTANCE, + new ClientConfiguration(conf)); Field field = BookKeeper.class.getDeclaredField("placementPolicy"); field.setAccessible(true); field.set(bookKeeper, repp); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestZoneawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestZoneawareEnsemblePlacementPolicy.java index 29f3f8d37a5..ea0476680d4 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestZoneawareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestZoneawareEnsemblePlacementPolicy.java @@ -27,7 +27,6 @@ import com.google.common.collect.Sets; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.netty.util.HashedWheelTimer; - import java.lang.reflect.Field; import java.net.InetAddress; import java.util.ArrayList; @@ -905,12 +904,13 @@ public void testReplaceBookieWithNoMoreBookie() throws Exception { zepp.onClusterChanged(addrs, new HashSet()); BookKeeper bookKeeper = new BookKeeper(); - BookKeeperAdmin admin = new BookKeeperAdmin(bookKeeper, NullStatsLogger.INSTANCE, new ClientConfiguration(conf)); + BookKeeperAdmin admin = new BookKeeperAdmin(bookKeeper, NullStatsLogger.INSTANCE, + new ClientConfiguration(conf)); Field field = BookKeeper.class.getDeclaredField("placementPolicy"); field.setAccessible(true); field.set(bookKeeper, zepp); LedgerHandle mockHandle = mock(LedgerHandle.class); - + Map> ensembles = new HashMap<>(); ensembles.put(0L, ensemble); LedgerMetadataImpl ledgerMetadata = new LedgerMetadataImpl(0, 0, 3, 3, 2, LedgerMetadata.State.CLOSED, From b5ed675bd6a7c42c409e3e4c8143caaf4126f4d7 Mon Sep 17 00:00:00 2001 From: horizonzy Date: Sat, 15 Jul 2023 22:03:09 +0800 Subject: [PATCH 13/13] Fix ci. --- .../client/TestRegionAwareEnsemblePlacementPolicy.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java index bc33d3cb5f7..5e218357cd8 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java @@ -589,7 +589,12 @@ public void testReplaceBookieWithNotEnoughBookies() throws Exception { excludedAddrs.add(addr1.toBookieId()); excludedAddrs.add(addr3.toBookieId()); excludedAddrs.add(addr4.toBookieId()); - repp.replaceBookie(1, 1, 1, null, new ArrayList(), addr2.toBookieId(), excludedAddrs); + try { + repp.replaceBookie(1, 1, 1, null, new ArrayList(), addr2.toBookieId(), excludedAddrs); + fail("Should throw BKNotEnoughBookiesException when there is not enough bookies"); + } catch (BKNotEnoughBookiesException bnebe) { + // should throw not enou + } } @Test