Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithMetadataBookieDriver;
import static org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithRegistrationManager;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -1061,14 +1062,15 @@ private Map<Integer, BookieId> getReplacementBookies(
}
}
return getReplacementBookiesByIndexes(
lh, ensemble, bookieIndexesToRereplicate, Optional.of(bookiesToRereplicate));
lh, ensemble, bookieIndexesToRereplicate, Optional.of(bookiesToRereplicate), false);
}

private Map<Integer, BookieId> getReplacementBookiesByIndexes(
@VisibleForTesting
Map<Integer, BookieId> getReplacementBookiesByIndexes(
LedgerHandle lh,
List<BookieId> ensemble,
Set<Integer> bookieIndexesToRereplicate,
Optional<Set<BookieId>> excludedBookies)
Optional<Set<BookieId>> excludedBookies, boolean downgradeToSelf)
throws BKException.BKNotEnoughBookiesException {
// target bookies to replicate
Map<Integer, BookieId> targetBookieAddresses =
Expand All @@ -1078,35 +1080,58 @@ private Map<Integer, BookieId> getReplacementBookiesByIndexes(
if (excludedBookies.isPresent()) {
bookiesToExclude.addAll(excludedBookies.get());
}
//We want to prioritize replacing offline nodes, so we put the indices of offline nodes at the beginning.
List<Integer> orderedBookieIndexesToRereplicate = new LinkedList<>();
for (Integer index : bookieIndexesToRereplicate) {
boolean alive = ((TopologyAwareEnsemblePlacementPolicy) bkc.getPlacementPolicy()).isAlive(
ensemble.get(index));
if (alive) {
orderedBookieIndexesToRereplicate.add(index);
} else {
orderedBookieIndexesToRereplicate.add(0, index);
}
}

// excluding bookies that need to be replicated
for (Integer bookieIndex : bookieIndexesToRereplicate) {
for (Integer bookieIndex : orderedBookieIndexesToRereplicate) {
BookieId bookie = ensemble.get(bookieIndex);
bookiesToExclude.add(bookie);
}

List<BookieId> newEnsemble = new ArrayList<>(ensemble);
// allocate bookies
for (Integer bookieIndex : bookieIndexesToRereplicate) {
BookieId oldBookie = ensemble.get(bookieIndex);
EnsemblePlacementPolicy.PlacementResult<BookieId> replaceBookieResponse =
bkc.getPlacementPolicy().replaceBookie(
lh.getLedgerMetadata().getEnsembleSize(),
lh.getLedgerMetadata().getWriteQuorumSize(),
lh.getLedgerMetadata().getAckQuorumSize(),
lh.getLedgerMetadata().getCustomMetadata(),
ensemble,
oldBookie,
bookiesToExclude);
BookieId newBookie = replaceBookieResponse.getResult();
PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getAdheringToPolicy();
if (isEnsembleAdheringToPlacementPolicy == PlacementPolicyAdherence.FAIL && LOG.isDebugEnabled()) {
LOG.debug(
"replaceBookie for bookie: {} in ensemble: {} "
+ "is not adhering to placement policy and chose {}",
oldBookie, ensemble, newBookie);
for (Integer bookieIndex : orderedBookieIndexesToRereplicate) {
BookieId oldBookie = newEnsemble.get(bookieIndex);
BookieId newBookie;
try {
EnsemblePlacementPolicy.PlacementResult<BookieId> replaceBookieResponse =
bkc.getPlacementPolicy().replaceBookie(
lh.getLedgerMetadata().getEnsembleSize(),
lh.getLedgerMetadata().getWriteQuorumSize(),
lh.getLedgerMetadata().getAckQuorumSize(),
lh.getLedgerMetadata().getCustomMetadata(),
newEnsemble,
oldBookie,
bookiesToExclude);
newBookie = replaceBookieResponse.getResult();
PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy =
replaceBookieResponse.getAdheringToPolicy();
if (isEnsembleAdheringToPlacementPolicy == PlacementPolicyAdherence.FAIL && LOG.isDebugEnabled()) {
LOG.debug(
"replaceBookie for bookie: {} in ensemble: {} "
+ "is not adhering to placement policy and chose {}",
oldBookie, newEnsemble, newBookie);
}
} catch (BKException.BKNotEnoughBookiesException e) {
if (downgradeToSelf && ((TopologyAwareEnsemblePlacementPolicy) bkc.getPlacementPolicy()).isAlive(
oldBookie)) {
newBookie = oldBookie;
} else {
throw e;
}
}
targetBookieAddresses.put(bookieIndex, newBookie);
bookiesToExclude.add(newBookie);
newEnsemble.set(bookieIndex, newBookie);
}

return targetBookieAddresses;
Expand Down Expand Up @@ -1136,7 +1161,7 @@ public void replicateLedgerFragment(LedgerHandle lh, final LedgerFragment ledger
if (LedgerFragment.ReplicateType.DATA_LOSS == ledgerFragment.getReplicateType()) {
Optional<Set<BookieId>> excludedBookies = Optional.empty();
targetBookieAddresses = getReplacementBookiesByIndexes(lh, ledgerFragment.getEnsemble(),
ledgerFragment.getBookiesIndexes(), excludedBookies);
ledgerFragment.getBookiesIndexes(), excludedBookies, true);
} else if (LedgerFragment.ReplicateType.DATA_NOT_ADHERING_PLACEMENT == ledgerFragment.getReplicateType()) {
targetBookieAddresses = replaceNotAdheringPlacementPolicyBookie(ledgerFragment.getEnsemble(),
lh.getLedgerMetadata().getWriteQuorumSize(), lh.getLedgerMetadata().getAckQuorumSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -846,4 +846,8 @@ protected BookieNode convertBookieToNode(BookieId addr) {
}
return bn;
}

public boolean isAlive(BookieId bookieId) {
return knownBookies.containsKey(bookieId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.powermock.api.mockito.PowerMockito.mock;
import static org.powermock.api.mockito.PowerMockito.when;

import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.util.HashedWheelTimer;
import java.lang.reflect.Field;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -47,6 +51,7 @@
import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy.Ensemble;
import org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.EnsembleForReplacementWithNoConstraints;
import org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.TruePredicate;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.common.util.ReflectionUtils;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.AbstractDNSToSwitchMapping;
Expand Down Expand Up @@ -636,6 +641,67 @@ public void testIsEnsembleAdheringToPlacementPolicy() throws Exception {
repp.isEnsembleAdheringToPlacementPolicy(ensemble, 3, 3));
}

@Test
public void testReplaceBookieWithNoMoreBookies() throws Exception {
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
// update dns mapping
StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/default-region/r1");
StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/default-region/r2");
StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/default-region/r3");
// Update cluster
Set<BookieId> addrs = new HashSet<BookieId>();
addrs.add(addr1.toBookieId());
addrs.add(addr2.toBookieId());
addrs.add(addr3.toBookieId());

List<BookieId> ensemble = new ArrayList<>();
ensemble.add(addr1.toBookieId());
ensemble.add(addr2.toBookieId());
ensemble.add(addr3.toBookieId());

repp.onClusterChanged(addrs, new HashSet<BookieId>());

BookKeeper bookKeeper = new BookKeeper();
BookKeeperAdmin admin = new BookKeeperAdmin(bookKeeper, NullStatsLogger.INSTANCE,
new ClientConfiguration(conf));
Field field = BookKeeper.class.getDeclaredField("placementPolicy");
field.setAccessible(true);
field.set(bookKeeper, repp);
LedgerHandle mockHandle = mock(LedgerHandle.class);

Map<Long, List<BookieId>> ensembles = new HashMap<>();
ensembles.put(0L, ensemble);
LedgerMetadataImpl ledgerMetadata = new LedgerMetadataImpl(0, 0, 3, 3, 2, LedgerMetadata.State.CLOSED,
Optional.of(1L), Optional.of(10L), ensembles, Optional.empty(), Optional.empty(), 0L, false, 0L,
Collections.emptyMap());
when(mockHandle.getLedgerMetadata()).thenReturn(ledgerMetadata);
//Not downgradeToSelf, can't find replaced bookie.
try {
admin.getReplacementBookiesByIndexes(mockHandle, ensemble, Sets.newHashSet(1), Optional.empty(), false);
fail("Should throw BKNotEnoughBookiesException when there is not enough bookies");
} catch (BKNotEnoughBookiesException ignore) {
}

//Use downgradeToSelf, find addr2 itself.
Map<Integer, BookieId> replaceResult = admin.getReplacementBookiesByIndexes(mockHandle, ensemble,
Sets.newHashSet(1), Optional.empty(), true);
assertFalse(replaceResult.isEmpty());
assertEquals(1, replaceResult.size());
assertEquals(addr2.toBookieId(), replaceResult.get(1));

//Make addr2 shutdown, then can't find replaced bookie.
addrs.remove(addr2.toBookieId());
repp.onClusterChanged(addrs, new HashSet<BookieId>());

try {
admin.getReplacementBookiesByIndexes(mockHandle, ensemble, Sets.newHashSet(1), Optional.empty(), false);
fail("Should throw BKNotEnoughBookiesException when there is not enough bookies");
} catch (BKNotEnoughBookiesException ignore) {
}
}

@Test
public void testReplaceBookieWithEnoughBookiesInSameRack() throws Exception {
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,16 @@
import static org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy.REPP_REGIONS_TO_WRITE;
import static org.apache.bookkeeper.client.RoundRobinDistributionSchedule.writeSetFromValues;
import static org.apache.bookkeeper.feature.SettableFeatureProvider.DISABLE_ALL;
import static org.powermock.api.mockito.PowerMockito.mock;
import static org.powermock.api.mockito.PowerMockito.when;

import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.util.HashedWheelTimer;
import java.lang.reflect.Field;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -42,6 +46,7 @@
import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.feature.FeatureProvider;
import org.apache.bookkeeper.feature.SettableFeature;
Expand Down Expand Up @@ -418,6 +423,68 @@ public void testNodeDownAndReadOnlyAndNodeSlow() throws Exception {
assertEquals(expectedSet, reorderSet);
}

@Test
public void testReplaceBookieWithNoMoreBookie() throws Exception {
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
// update dns mapping
StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/region1/r1");
StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/region1/r2");
StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/default-region/r3");

// Update cluster
Set<BookieId> addrs = new HashSet<BookieId>();
addrs.add(addr1.toBookieId());
addrs.add(addr2.toBookieId());
addrs.add(addr3.toBookieId());

List<BookieId> ensemble = new ArrayList<>();
ensemble.add(addr1.toBookieId());
ensemble.add(addr2.toBookieId());
ensemble.add(addr3.toBookieId());

repp.onClusterChanged(addrs, new HashSet<BookieId>());

BookKeeper bookKeeper = new BookKeeper();
BookKeeperAdmin admin = new BookKeeperAdmin(bookKeeper, NullStatsLogger.INSTANCE,
new ClientConfiguration(conf));
Field field = BookKeeper.class.getDeclaredField("placementPolicy");
field.setAccessible(true);
field.set(bookKeeper, repp);
LedgerHandle mockHandle = mock(LedgerHandle.class);
Map<Long, List<BookieId>> ensembles = new HashMap<>();
ensembles.put(0L, ensemble);
LedgerMetadataImpl ledgerMetadata = new LedgerMetadataImpl(0, 0, 3, 3, 2, LedgerMetadata.State.CLOSED,
Optional.of(1L), Optional.of(10L), ensembles, Optional.empty(), Optional.empty(), 0L, false, 0L,
Collections.emptyMap());
when(mockHandle.getLedgerMetadata()).thenReturn(ledgerMetadata);

//Not downgradeToSelf, can't find replaced bookie.
try {
admin.getReplacementBookiesByIndexes(mockHandle, ensemble, Sets.newHashSet(1), Optional.empty(), false);
fail("Should throw BKNotEnoughBookiesException when there is not enough bookies");
} catch (BKNotEnoughBookiesException ignore) {
}

//Use downgradeToSelf, find addr2 itself.
Map<Integer, BookieId> replaceResult = admin.getReplacementBookiesByIndexes(mockHandle, ensemble,
Sets.newHashSet(1), Optional.empty(), true);
assertFalse(replaceResult.isEmpty());
assertEquals(1, replaceResult.size());
assertEquals(addr2.toBookieId(), replaceResult.get(1));

//Make addr2 shutdown, then can't find replaced bookie.
addrs.remove(addr2.toBookieId());
repp.onClusterChanged(addrs, new HashSet<BookieId>());

try {
admin.getReplacementBookiesByIndexes(mockHandle, ensemble, Sets.newHashSet(1), Optional.empty(), false);
fail("Should throw BKNotEnoughBookiesException when there is not enough bookies");
} catch (BKNotEnoughBookiesException ignore) {
}
}

@Test
public void testReplaceBookieWithEnoughBookiesInSameRegion() throws Exception {
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181);
Expand Down
Loading