Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public static ConsumerHashAssignmentsSnapshot empty() {
}

public ImpactedConsumersResult resolveImpactedConsumers(ConsumerHashAssignmentsSnapshot assignmentsAfter) {
return resolveConsumerUpdatedHashRanges(this.hashRangeAssignments, assignmentsAfter.hashRangeAssignments);
return resolveConsumerRemovedHashRanges(this.hashRangeAssignments, assignmentsAfter.hashRangeAssignments);
}

/**
Expand Down Expand Up @@ -111,36 +111,28 @@ Map<Range, Pair<Consumer, Consumer>> diffRanges(ConsumerHashAssignmentsSnapshot
* @param mappingAfter the range mapping after the change
* @return consumers and ranges where the existing range changed
*/
static ImpactedConsumersResult resolveConsumerUpdatedHashRanges(List<HashRangeAssignment> mappingBefore,
static ImpactedConsumersResult resolveConsumerRemovedHashRanges(List<HashRangeAssignment> mappingBefore,
List<HashRangeAssignment> mappingAfter) {
Map<Range, Pair<Consumer, Consumer>> impactedRanges = diffRanges(mappingBefore, mappingAfter);
Map<Consumer, SortedSet<Range>> addedRangesByConsumer = new IdentityHashMap<>();
Map<Consumer, SortedSet<Range>> removedRangesByConsumer = new IdentityHashMap<>();
impactedRanges.forEach((range, value) -> {
Consumer consumerAfter = value.getRight();

// last consumer was removed
if (consumerAfter != null) {
addedRangesByConsumer.computeIfAbsent(consumerAfter, k -> new TreeSet<>()).add(range);
}
// filter out only where the range was removed
Consumer consumerBefore = value.getLeft();
if (consumerBefore != null) {
removedRangesByConsumer.computeIfAbsent(consumerBefore, k -> new TreeSet<>()).add(range);
}
});
var removedMerged = mergedOverlappingRangesAndConvertToImpactedConsumersResult(removedRangesByConsumer);
var addedMerged = mergedOverlappingRangesAndConvertToImpactedConsumersResult(addedRangesByConsumer);
return ImpactedConsumersResult.of(removedMerged, addedMerged);
Map<Consumer, SortedSet<Range>> removedRangesByConsumer = impactedRanges.entrySet().stream()
.collect(IdentityHashMap::new, (resultMap, entry) -> {
Range range = entry.getKey();
// filter out only where the range was removed
Consumer consumerBefore = entry.getValue().getLeft();
if (consumerBefore != null) {
resultMap.computeIfAbsent(consumerBefore, k -> new TreeSet<>()).add(range);
}
}, IdentityHashMap::putAll);
return mergedOverlappingRangesAndConvertToImpactedConsumersResult(removedRangesByConsumer);
}

static Map<Consumer, UpdatedHashRanges> mergedOverlappingRangesAndConvertToImpactedConsumersResult(
Map<Consumer, SortedSet<Range>> updatedRangesByConsumer) {
Map<Consumer, UpdatedHashRanges> mergedRangesByConsumer = new IdentityHashMap<>();
updatedRangesByConsumer.forEach((consumer, ranges) -> {
mergedRangesByConsumer.put(consumer, UpdatedHashRanges.of(mergeOverlappingRanges(ranges)));
static ImpactedConsumersResult mergedOverlappingRangesAndConvertToImpactedConsumersResult(
Map<Consumer, SortedSet<Range>> removedRangesByConsumer) {
Map<Consumer, RemovedHashRanges> mergedRangesByConsumer = new IdentityHashMap<>();
removedRangesByConsumer.forEach((consumer, ranges) -> {
mergedRangesByConsumer.put(consumer, RemovedHashRanges.of(mergeOverlappingRanges(ranges)));
});
return mergedRangesByConsumer;
return ImpactedConsumersResult.of(mergedRangesByConsumer);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,14 +400,6 @@ public boolean shouldBlockStickyKeyHash(Consumer consumer, int stickyKeyHash) {
} finally {
lock.writeLock().unlock();
}

// update the consumer specific stats
ConsumerDrainingHashesStats drainingHashesStats =
consumerDrainingHashesStatsMap.get(new ConsumerIdentityWrapper(consumer));
if (drainingHashesStats != null) {
drainingHashesStats.clearHash(stickyKeyHash);
}

return false;
}
// increment the blocked count which is used to determine if the hash is blocking
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,44 +30,30 @@
@EqualsAndHashCode
@ToString
public class ImpactedConsumersResult {
public interface UpdatedHashRangesProcessor {
void process(Consumer consumer, UpdatedHashRanges updatedHashRanges, OperationType operationType);
public interface RemovedHashRangesProcessor {
void process(Consumer consumer, RemovedHashRanges removedHashRanges);
}

private final Map<Consumer, UpdatedHashRanges> removedHashRanges;
private final Map<Consumer, UpdatedHashRanges> addedHashRanges;
private final Map<Consumer, RemovedHashRanges> removedHashRanges;

private ImpactedConsumersResult(Map<Consumer, UpdatedHashRanges> removedHashRanges,
Map<Consumer, UpdatedHashRanges> addedHashRanges) {
private ImpactedConsumersResult(Map<Consumer, RemovedHashRanges> removedHashRanges) {
this.removedHashRanges = removedHashRanges;
this.addedHashRanges = addedHashRanges;
}

public static ImpactedConsumersResult of(Map<Consumer, UpdatedHashRanges> removedHashRanges,
Map<Consumer, UpdatedHashRanges> addedHashRanges) {
return new ImpactedConsumersResult(removedHashRanges, addedHashRanges);
public static ImpactedConsumersResult of(Map<Consumer, RemovedHashRanges> removedHashRanges) {
return new ImpactedConsumersResult(removedHashRanges);
}

public void processUpdatedHashRanges(UpdatedHashRangesProcessor processor) {
removedHashRanges.forEach((c, r) -> processor.process(c, r, OperationType.REMOVE));
addedHashRanges.forEach((c, r) -> processor.process(c, r, OperationType.ADD));
public void processRemovedHashRanges(RemovedHashRangesProcessor processor) {
removedHashRanges.forEach((c, r) -> processor.process(c, r));
}

public boolean isEmpty() {
return removedHashRanges.isEmpty();
}

@VisibleForTesting
Map<Consumer, UpdatedHashRanges> getRemovedHashRanges() {
Map<Consumer, RemovedHashRanges> getRemovedHashRanges() {
return removedHashRanges;
}

@VisibleForTesting
Map<Consumer, UpdatedHashRanges> getAddedHashRanges() {
return addedHashRanges;
}

public enum OperationType {
ADD, REMOVE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
*/
@EqualsAndHashCode
@ToString
public class UpdatedHashRanges {
public class RemovedHashRanges {
private final Range[] sortedRanges;

private UpdatedHashRanges(List<Range> ranges) {
private RemovedHashRanges(List<Range> ranges) {
// Converts the set of ranges to an array to avoid iterator allocation
// when the ranges are iterator multiple times in the pending acknowledgments loop.
this.sortedRanges = ranges.toArray(new Range[0]);
Expand All @@ -52,8 +52,8 @@ private void validateSortedRanges() {
}
}

public static UpdatedHashRanges of(List<Range> ranges) {
return new UpdatedHashRanges(ranges);
public static RemovedHashRanges of(List<Range> ranges) {
return new RemovedHashRanges(ranges);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,25 +170,16 @@ public void endBatch() {

private synchronized void registerDrainingHashes(Consumer skipConsumer,
ImpactedConsumersResult impactedConsumers) {
impactedConsumers.processUpdatedHashRanges((c, updatedHashRanges, opType) -> {
impactedConsumers.processRemovedHashRanges((c, removedHashRanges) -> {
if (c != skipConsumer) {
c.getPendingAcks().forEach((ledgerId, entryId, batchSize, stickyKeyHash) -> {
if (stickyKeyHash == STICKY_KEY_HASH_NOT_SET) {
log.warn("[{}] Sticky key hash was missing for {}:{}", getName(), ledgerId, entryId);
return;
}
if (updatedHashRanges.containsStickyKey(stickyKeyHash)) {
switch (opType) {
//reduce ref count in case the stickyKeyHash was re-assigned to the original consumer
case ADD -> {
var entry = drainingHashesTracker.getEntry(stickyKeyHash);
if (entry != null && entry.getConsumer() == c) {
drainingHashesTracker.reduceRefCount(c, stickyKeyHash, false);
}
}
// add the pending ack to the draining hashes tracker if the hash is in the range
case REMOVE -> drainingHashesTracker.addEntry(c, stickyKeyHash);
}
if (removedHashRanges.containsStickyKey(stickyKeyHash)) {
// add the pending ack to the draining hashes tracker if the hash is in the range
drainingHashesTracker.addEntry(c, stickyKeyHash);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
import org.apache.pulsar.client.api.Range;
import org.assertj.core.data.Offset;
import org.mockito.Mockito;
Expand All @@ -46,7 +47,7 @@
public class ConsistentHashingStickyKeyConsumerSelectorTest {

@Test
public void testConsumerSelect() {
public void testConsumerSelect() throws ConsumerAssignException {

ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(200);
String key1 = "anyKey";
Expand Down Expand Up @@ -150,7 +151,7 @@ public void testConsumerSelect() {


@Test
public void testGetConsumerKeyHashRanges() {
public void testGetConsumerKeyHashRanges() throws BrokerServiceException.ConsumerAssignException {
ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(3);
List<String> consumerName = Arrays.asList("consumer1", "consumer2", "consumer3");
List<Consumer> consumers = new ArrayList<>();
Expand Down Expand Up @@ -200,7 +201,8 @@ public void testGetConsumerKeyHashRanges() {
}

@Test
public void testConsumersGetSufficientlyAccuratelyEvenlyMapped() {
public void testConsumersGetSufficientlyAccuratelyEvenlyMapped()
throws BrokerServiceException.ConsumerAssignException {
ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(200);
List<Consumer> consumers = new ArrayList<>();
for (int i = 0; i < 20; i++) {
Expand Down Expand Up @@ -528,24 +530,11 @@ public void testShouldNotSwapExistingConsumers() {
selector.removeConsumer(consumer);

ConsumerHashAssignmentsSnapshot assignmentsAfter = selector.getConsumerHashAssignmentsSnapshot();
ImpactedConsumersResult impactedConsumersAfterRemoval = assignmentsBefore
.resolveImpactedConsumers(assignmentsAfter);
assertThat(impactedConsumersAfterRemoval.getRemovedHashRanges())
assertThat(assignmentsBefore.resolveImpactedConsumers(assignmentsAfter).getRemovedHashRanges())
.describedAs(
"when a consumer is removed, the removed hash ranges should only be from "
+ "the removed consumer")
.containsOnlyKeys(consumer);
List<Range> allAddedRangesAfterRemoval = ConsumerHashAssignmentsSnapshot.mergeOverlappingRanges(
impactedConsumersAfterRemoval.getAddedHashRanges().values().stream()
.map(UpdatedHashRanges::asRanges).flatMap(List::stream)
.collect(Collectors.toCollection(TreeSet::new))
);
assertThat(allAddedRangesAfterRemoval)
.describedAs(
"when a consumer is removed, all its hash ranges should appear "
+ "in added hash ranges"
)
.containsExactlyElementsOf(assignmentsBefore.getRangesByConsumer().get(consumer));
assignmentsBefore = assignmentsAfter;

// add consumer back
Expand All @@ -554,9 +543,8 @@ public void testShouldNotSwapExistingConsumers() {
assignmentsAfter = selector.getConsumerHashAssignmentsSnapshot();
List<Range> addedConsumerRanges = assignmentsAfter.getRangesByConsumer().get(consumer);

ImpactedConsumersResult impactedConsumersAfterAdding = assignmentsBefore
.resolveImpactedConsumers(assignmentsAfter);
Map<Consumer, UpdatedHashRanges> removedHashRanges = impactedConsumersAfterAdding.getRemovedHashRanges();
Map<Consumer, RemovedHashRanges> removedHashRanges =
assignmentsBefore.resolveImpactedConsumers(assignmentsAfter).getRemovedHashRanges();
ConsumerHashAssignmentsSnapshot finalAssignmentsBefore = assignmentsBefore;
assertThat(removedHashRanges).allSatisfy((c, removedHashRange) -> {
assertThat(removedHashRange
Expand All @@ -570,19 +558,12 @@ public void testShouldNotSwapExistingConsumers() {

List<Range> allRemovedRanges =
ConsumerHashAssignmentsSnapshot.mergeOverlappingRanges(
removedHashRanges.values().stream()
.map(UpdatedHashRanges::asRanges)
removedHashRanges.entrySet().stream().map(Map.Entry::getValue)
.map(RemovedHashRanges::asRanges)
.flatMap(List::stream).collect(Collectors.toCollection(TreeSet::new)));
assertThat(allRemovedRanges)
.describedAs("all removed ranges should be the same as the ranges of the added consumer")
.containsExactlyElementsOf(addedConsumerRanges);
List<Range> allAddedRangesAfterAdding = ConsumerHashAssignmentsSnapshot.mergeOverlappingRanges(
impactedConsumersAfterAdding.getAddedHashRanges().values().stream()
.map(UpdatedHashRanges::asRanges)
.flatMap(List::stream).collect(Collectors.toCollection(TreeSet::new)));
assertThat(addedConsumerRanges)
.describedAs("all added ranges should be the same as the ranges of the added consumer")
.containsExactlyElementsOf(allAddedRangesAfterAdding);

assignmentsBefore = assignmentsAfter;
}
Expand Down
Loading
Loading