Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,13 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
static final int SLOW_MASK = 0x20 << 24;
static final int UNAVAIL_MASK = 0x40 << 24;
static final int MASK_BITS = 0xFFF << 20;
static final String UNKNOWN_RACK = "UnknownRack";

protected HashedWheelTimer timer;
// Use a loading cache so slow bookies are expired. Use entryId as values.
protected Cache<BookieId, Long> slowBookies;
protected BookieNode localNode;
protected String myRack;
protected boolean reorderReadsRandom = false;
protected boolean enforceDurability = false;
protected int stabilizePeriodSeconds = 0;
Expand Down Expand Up @@ -217,7 +219,7 @@ public Integer getSample() {
LOG.info("Initialize rackaware ensemble placement policy @ {} @ {} : {}.",
localNode, null == localNode ? "Unknown" : localNode.getNetworkLocation(),
dnsResolver.getClass().getName());

myRack = getLocalRack(localNode);
this.isWeighted = isWeighted;
if (this.isWeighted) {
this.maxWeightMultiple = maxWeightMultiple;
Expand Down Expand Up @@ -815,10 +817,12 @@ public DistributionSchedule.WriteSet reorderReadSequence(
DistributionSchedule.WriteSet writeSet) {
Map<Integer, String> writeSetWithRegion = new HashMap<>();
for (int i = 0; i < writeSet.size(); i++) {
writeSetWithRegion.put(writeSet.get(i), "");
int idx = writeSet.get(i);
writeSetWithRegion.put(idx, resolveNetworkLocation(ensemble.get(idx)));
}
return reorderReadSequenceWithRegion(
ensemble, writeSet, writeSetWithRegion, bookiesHealthInfo, false, "", writeSet.size());
ensemble, writeSet, writeSetWithRegion, bookiesHealthInfo, true,
myRack, writeSet.size());
}

/**
Expand Down Expand Up @@ -858,7 +862,7 @@ DistributionSchedule.WriteSet reorderReadSequenceWithRegion(
boolean regionAware,
String myRegion,
int remoteNodeInReorderSequence) {
boolean useRegionAware = regionAware && (!myRegion.equals(UNKNOWN_REGION));
boolean useRegionAware = regionAware && !UNKNOWN_REGION.equals(myRegion) && !UNKNOWN_RACK.equals(myRegion);
int ensembleSize = ensemble.size();

// For rack aware, If all the bookies in the write set are available, simply return the original write set,
Expand Down Expand Up @@ -950,7 +954,7 @@ DistributionSchedule.WriteSet reorderReadSequenceWithRegion(
long slowIdx = numPendingReqs * ensembleSize + idx;
writeSet.set(i, (int) (slowIdx & ~MASK_BITS) | SLOW_MASK);
} else {
if (useRegionAware && !myRegion.equals(region)) {
if (useRegionAware && !region.equals(myRegion)) {
writeSet.set(i, idx | REMOTE_MASK);
} else {
writeSet.set(i, idx | LOCAL_MASK);
Expand All @@ -959,7 +963,7 @@ DistributionSchedule.WriteSet reorderReadSequenceWithRegion(
} else {
// use bookies with earlier failed entryIds first
long failIdx = lastFailedEntryOnBookie * ensembleSize + idx;
if (useRegionAware && !myRegion.equals(region)) {
if (useRegionAware && !region.equals(myRegion)) {
writeSet.set(i, (int) (failIdx & ~MASK_BITS) | REMOTE_FAIL_MASK);
} else {
writeSet.set(i, (int) (failIdx & ~MASK_BITS) | LOCAL_FAIL_MASK);
Expand Down Expand Up @@ -1318,4 +1322,11 @@ private BookieNode replaceToAdherePlacementPolicyInternal(
}
throw new BKNotEnoughBookiesException();
}

protected String getLocalRack(BookieNode node) {
if (null == node || null == node.getAddr()) {
return UNKNOWN_RACK;
}
return node.getNetworkLocation();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the network location is /default-rack, shall we think of it as UNKNOWN_RACK?

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.apache.bookkeeper.feature.SettableFeatureProvider.DISABLE_ALL;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThat;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
Expand Down Expand Up @@ -223,7 +224,7 @@ public void testNodeReadOnly() throws Exception {
DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
ensemble, getBookiesHealthInfo(), writeSet);
LOG.info("reorder set : {}", reorderSet);
assertEquals(reorderSet, origWriteSet);
assertNotEquals(reorderSet, origWriteSet);
}

@Test
Expand Down Expand Up @@ -1642,6 +1643,214 @@ public void testNewEnsembleWithMultipleRacksWithCommonRackFailed() throws Except
}
}

@Test
public void testReadRequestReorderWithLocalNodeAware() throws Exception {
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181);
BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.3", 3181);
BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.4", 3181);
BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.5", 3181);
// update dns mapping
StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/default-region/r1");
StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/default-region/r1");
StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/default-region/r2");
StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/default-region/r3");

repp.uninitalize();
updateMyRack("/default-region/r2");

repp = new RackawareEnsemblePlacementPolicy();
ClientConfiguration conf = (ClientConfiguration) this.conf.clone();
conf.setReorderReadSequenceEnabled(true);
repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer,
DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);

//update cluster
Set<BookieId> addrs = new HashSet<BookieId>();
addrs.add(addr1.toBookieId());
addrs.add(addr2.toBookieId());
addrs.add(addr3.toBookieId());
addrs.add(addr4.toBookieId());
repp.onClusterChanged(addrs, new HashSet<BookieId>());

List<BookieId> testEnsemble = new ArrayList<>();
testEnsemble.add(addr1.toBookieId());
testEnsemble.add(addr2.toBookieId());
testEnsemble.add(addr3.toBookieId());
testEnsemble.add(addr4.toBookieId());
DistributionSchedule.WriteSet testWriteSet = writeSetFromValues(0, 1, 2, 3, 0, 1, 2);

DistributionSchedule.WriteSet origWriteSet = testWriteSet.copy();
DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
testEnsemble, getBookiesHealthInfo(), testWriteSet);
assertNotEquals(reorderSet, origWriteSet);
assertEquals(reorderSet.get(0), origWriteSet.get(2));
}

@Test
public void testReadRequestReorderWithLocalNodeIgnore() throws Exception {
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181);
BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.3", 3181);
BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.4", 3181);
BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.5", 3181);
// update dns mapping
StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/default-region/r1");
StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/default-region/r1");
StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/default-region/r2");
StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/default-region/r3");

repp.uninitalize();

repp = new RackawareEnsemblePlacementPolicy();
ClientConfiguration conf = (ClientConfiguration) this.conf.clone();
conf.setReorderReadSequenceEnabled(true);
conf.setIgnoreLocalNodeInPlacementPolicy(true);
repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer,
DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);

//update cluster
Set<BookieId> addrs = new HashSet<BookieId>();
addrs.add(addr1.toBookieId());
addrs.add(addr2.toBookieId());
addrs.add(addr3.toBookieId());
addrs.add(addr4.toBookieId());
repp.onClusterChanged(addrs, new HashSet<BookieId>());

List<BookieId> testEnsemble = new ArrayList<>();
testEnsemble.add(addr1.toBookieId());
testEnsemble.add(addr2.toBookieId());
testEnsemble.add(addr3.toBookieId());
testEnsemble.add(addr4.toBookieId());
DistributionSchedule.WriteSet testWriteSet = writeSetFromValues(0, 1, 2, 3, 0, 1, 2);

DistributionSchedule.WriteSet origWriteSet = testWriteSet.copy();
DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
testEnsemble, getBookiesHealthInfo(), testWriteSet);
assertTrue(reorderSet.equals(origWriteSet));
}

@Test
public void testReadRequestReorderWithLocalAndHealthInfo() throws Exception {
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181);
BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.3", 3181);
BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.4", 3181);
BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.5", 3181);
BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.6", 3181);
// update dns mapping
StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/default-region/r1");
StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/default-region/r1");
StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/default-region/r2");
StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/default-region/r3");
StaticDNSResolver.addNodeToRack(addr5.getHostName(), "/default-region/r2");

repp.uninitalize();
updateMyRack("/default-region/r2");

repp = new RackawareEnsemblePlacementPolicy();
ClientConfiguration conf = (ClientConfiguration) this.conf.clone();
conf.setReorderReadSequenceEnabled(true);
conf.setReorderThresholdPendingRequests(10);
repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer,
DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);

//update cluster
Set<BookieId> addrs = new HashSet<BookieId>();
addrs.add(addr1.toBookieId());
addrs.add(addr2.toBookieId());
addrs.add(addr3.toBookieId());
addrs.add(addr4.toBookieId());
addrs.add(addr5.toBookieId());
repp.onClusterChanged(addrs, new HashSet<BookieId>());

List<BookieId> testEnsemble = new ArrayList<>();
testEnsemble.add(addr1.toBookieId());
testEnsemble.add(addr2.toBookieId());
testEnsemble.add(addr3.toBookieId());
testEnsemble.add(addr4.toBookieId());
testEnsemble.add(addr5.toBookieId());
DistributionSchedule.WriteSet testWriteSet = writeSetFromValues(0, 1, 2, 3, 4, 0, 1);

Map<BookieId, Long> bookiePendingMap = new HashMap<>();
bookiePendingMap.put(addr1.toBookieId(), 120L);
bookiePendingMap.put(addr2.toBookieId(), 70L);
bookiePendingMap.put(addr3.toBookieId(), 50L);
bookiePendingMap.put(addr4.toBookieId(), 209L);
bookiePendingMap.put(addr5.toBookieId(), 15L);

DistributionSchedule.WriteSet origWriteSet = testWriteSet.copy();
DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
testEnsemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), testWriteSet);
assertNotEquals(reorderSet, origWriteSet);
assertEquals(reorderSet.get(0), origWriteSet.get(4));
assertEquals(reorderSet.get(1), origWriteSet.get(2));
}

@Test
public void testReadRequestReorderWithLocalAndHealthInfoV2() throws Exception {
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181);
BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.3", 3181);
BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.4", 3181);
BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.5", 3181);
BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.6", 3181);
// update dns mapping
StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/default-region/r1");
StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/default-region/r1");
StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/default-region/r2");
StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/default-region/r3");
StaticDNSResolver.addNodeToRack(addr5.getHostName(), "/default-region/r2");

repp.uninitalize();
updateMyRack("/default-region/r2");

repp = new RackawareEnsemblePlacementPolicy();
ClientConfiguration conf = (ClientConfiguration) this.conf.clone();
conf.setReorderReadSequenceEnabled(true);
conf.setReorderThresholdPendingRequests(10);
repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer,
DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);

//update cluster
Set<BookieId> addrs = new HashSet<BookieId>();
addrs.add(addr1.toBookieId());
addrs.add(addr2.toBookieId());
addrs.add(addr3.toBookieId());
addrs.add(addr4.toBookieId());
addrs.add(addr5.toBookieId());
repp.onClusterChanged(addrs, new HashSet<BookieId>());

List<BookieId> testEnsemble = new ArrayList<>();
testEnsemble.add(addr1.toBookieId());
testEnsemble.add(addr2.toBookieId());
testEnsemble.add(addr3.toBookieId());
testEnsemble.add(addr4.toBookieId());
testEnsemble.add(addr5.toBookieId());
DistributionSchedule.WriteSet testWriteSet = writeSetFromValues(0, 1, 2, 3, 4, 0, 1);

Map<BookieId, Long> bookiePendingMap = new HashMap<>();
bookiePendingMap.put(addr1.toBookieId(), 120L);
bookiePendingMap.put(addr2.toBookieId(), 70L);
bookiePendingMap.put(addr3.toBookieId(), 100L);
bookiePendingMap.put(addr4.toBookieId(), 209L);
bookiePendingMap.put(addr5.toBookieId(), 90L);

Map<BookieId, Long> bookieFailureHistory = new HashMap<>();
bookieFailureHistory.put(addr1.toBookieId(), 1L);
bookieFailureHistory.put(addr2.toBookieId(), 0L);
bookieFailureHistory.put(addr3.toBookieId(), 20L);
bookieFailureHistory.put(addr4.toBookieId(), 12L);
bookieFailureHistory.put(addr5.toBookieId(), 15L);

DistributionSchedule.WriteSet origWriteSet = testWriteSet.copy();
DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
testEnsemble, getBookiesHealthInfo(bookieFailureHistory, bookiePendingMap), testWriteSet);
assertNotEquals(reorderSet, origWriteSet);
assertEquals(reorderSet.get(0), origWriteSet.get(2));
assertEquals(reorderSet.get(1), origWriteSet.get(4));
}

@Test
public void testNewEnsembleWithPickDifferentRack() throws Exception {
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
Expand Down
Loading