Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public static ConsumerHashAssignmentsSnapshot empty() {
}

public ImpactedConsumersResult resolveImpactedConsumers(ConsumerHashAssignmentsSnapshot assignmentsAfter) {
return resolveConsumerRemovedHashRanges(this.hashRangeAssignments, assignmentsAfter.hashRangeAssignments);
return resolveConsumerUpdatedHashRanges(this.hashRangeAssignments, assignmentsAfter.hashRangeAssignments);
}

/**
Expand Down Expand Up @@ -111,28 +111,36 @@ Map<Range, Pair<Consumer, Consumer>> diffRanges(ConsumerHashAssignmentsSnapshot
* @param mappingAfter the range mapping after the change
* @return consumers and ranges where the existing range changed
*/
static ImpactedConsumersResult resolveConsumerRemovedHashRanges(List<HashRangeAssignment> mappingBefore,
static ImpactedConsumersResult resolveConsumerUpdatedHashRanges(List<HashRangeAssignment> mappingBefore,
List<HashRangeAssignment> mappingAfter) {
Map<Range, Pair<Consumer, Consumer>> impactedRanges = diffRanges(mappingBefore, mappingAfter);
Map<Consumer, SortedSet<Range>> removedRangesByConsumer = impactedRanges.entrySet().stream()
.collect(IdentityHashMap::new, (resultMap, entry) -> {
Range range = entry.getKey();
// filter out only where the range was removed
Consumer consumerBefore = entry.getValue().getLeft();
if (consumerBefore != null) {
resultMap.computeIfAbsent(consumerBefore, k -> new TreeSet<>()).add(range);
}
}, IdentityHashMap::putAll);
return mergedOverlappingRangesAndConvertToImpactedConsumersResult(removedRangesByConsumer);
Map<Consumer, SortedSet<Range>> addedRangesByConsumer = new IdentityHashMap<>();
Map<Consumer, SortedSet<Range>> removedRangesByConsumer = new IdentityHashMap<>();
impactedRanges.forEach((range, value) -> {
Consumer consumerAfter = value.getRight();

// last consumer was removed
if (consumerAfter != null) {
addedRangesByConsumer.computeIfAbsent(consumerAfter, k -> new TreeSet<>()).add(range);
}
// filter out only where the range was removed
Consumer consumerBefore = value.getLeft();
if (consumerBefore != null) {
removedRangesByConsumer.computeIfAbsent(consumerBefore, k -> new TreeSet<>()).add(range);
}
});
var removedMerged = mergedOverlappingRangesAndConvertToImpactedConsumersResult(removedRangesByConsumer);
var addedMerged = mergedOverlappingRangesAndConvertToImpactedConsumersResult(addedRangesByConsumer);
return ImpactedConsumersResult.of(removedMerged, addedMerged);
}

static ImpactedConsumersResult mergedOverlappingRangesAndConvertToImpactedConsumersResult(
Map<Consumer, SortedSet<Range>> removedRangesByConsumer) {
Map<Consumer, RemovedHashRanges> mergedRangesByConsumer = new IdentityHashMap<>();
removedRangesByConsumer.forEach((consumer, ranges) -> {
mergedRangesByConsumer.put(consumer, RemovedHashRanges.of(mergeOverlappingRanges(ranges)));
static Map<Consumer, UpdatedHashRanges> mergedOverlappingRangesAndConvertToImpactedConsumersResult(
Map<Consumer, SortedSet<Range>> updatedRangesByConsumer) {
Map<Consumer, UpdatedHashRanges> mergedRangesByConsumer = new IdentityHashMap<>();
updatedRangesByConsumer.forEach((consumer, ranges) -> {
mergedRangesByConsumer.put(consumer, UpdatedHashRanges.of(mergeOverlappingRanges(ranges)));
});
return ImpactedConsumersResult.of(mergedRangesByConsumer);
return mergedRangesByConsumer;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,14 @@ public boolean shouldBlockStickyKeyHash(Consumer consumer, int stickyKeyHash) {
} finally {
lock.writeLock().unlock();
}

// update the consumer specific stats
ConsumerDrainingHashesStats drainingHashesStats =
consumerDrainingHashesStatsMap.get(new ConsumerIdentityWrapper(consumer));
if (drainingHashesStats != null) {
drainingHashesStats.clearHash(stickyKeyHash);
}

return false;
}
// increment the blocked count which is used to determine if the hash is blocking
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,30 +30,44 @@
@EqualsAndHashCode
@ToString
public class ImpactedConsumersResult {
public interface RemovedHashRangesProcessor {
void process(Consumer consumer, RemovedHashRanges removedHashRanges);
public interface UpdatedHashRangesProcessor {
void process(Consumer consumer, UpdatedHashRanges updatedHashRanges, OperationType operationType);
}

private final Map<Consumer, RemovedHashRanges> removedHashRanges;
private final Map<Consumer, UpdatedHashRanges> removedHashRanges;
private final Map<Consumer, UpdatedHashRanges> addedHashRanges;

private ImpactedConsumersResult(Map<Consumer, RemovedHashRanges> removedHashRanges) {
private ImpactedConsumersResult(Map<Consumer, UpdatedHashRanges> removedHashRanges,
Map<Consumer, UpdatedHashRanges> addedHashRanges) {
this.removedHashRanges = removedHashRanges;
this.addedHashRanges = addedHashRanges;
}

public static ImpactedConsumersResult of(Map<Consumer, RemovedHashRanges> removedHashRanges) {
return new ImpactedConsumersResult(removedHashRanges);
public static ImpactedConsumersResult of(Map<Consumer, UpdatedHashRanges> removedHashRanges,
Map<Consumer, UpdatedHashRanges> addedHashRanges) {
return new ImpactedConsumersResult(removedHashRanges, addedHashRanges);
}

public void processRemovedHashRanges(RemovedHashRangesProcessor processor) {
removedHashRanges.forEach((c, r) -> processor.process(c, r));
public void processUpdatedHashRanges(UpdatedHashRangesProcessor processor) {
removedHashRanges.forEach((c, r) -> processor.process(c, r, OperationType.REMOVE));
addedHashRanges.forEach((c, r) -> processor.process(c, r, OperationType.ADD));
}

public boolean isEmpty() {
return removedHashRanges.isEmpty();
}

@VisibleForTesting
Map<Consumer, RemovedHashRanges> getRemovedHashRanges() {
Map<Consumer, UpdatedHashRanges> getRemovedHashRanges() {
return removedHashRanges;
}

@VisibleForTesting
Map<Consumer, UpdatedHashRanges> getAddedHashRanges() {
return addedHashRanges;
}

public enum OperationType {
ADD, REMOVE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
*/
@EqualsAndHashCode
@ToString
public class RemovedHashRanges {
public class UpdatedHashRanges {
private final Range[] sortedRanges;

private RemovedHashRanges(List<Range> ranges) {
private UpdatedHashRanges(List<Range> ranges) {
// Converts the set of ranges to an array to avoid iterator allocation
// when the ranges are iterator multiple times in the pending acknowledgments loop.
this.sortedRanges = ranges.toArray(new Range[0]);
Expand All @@ -52,8 +52,8 @@ private void validateSortedRanges() {
}
}

public static RemovedHashRanges of(List<Range> ranges) {
return new RemovedHashRanges(ranges);
public static UpdatedHashRanges of(List<Range> ranges) {
return new UpdatedHashRanges(ranges);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,16 +170,25 @@ public void endBatch() {

private synchronized void registerDrainingHashes(Consumer skipConsumer,
ImpactedConsumersResult impactedConsumers) {
impactedConsumers.processRemovedHashRanges((c, removedHashRanges) -> {
impactedConsumers.processUpdatedHashRanges((c, updatedHashRanges, opType) -> {
if (c != skipConsumer) {
c.getPendingAcks().forEach((ledgerId, entryId, batchSize, stickyKeyHash) -> {
if (stickyKeyHash == STICKY_KEY_HASH_NOT_SET) {
log.warn("[{}] Sticky key hash was missing for {}:{}", getName(), ledgerId, entryId);
return;
}
if (removedHashRanges.containsStickyKey(stickyKeyHash)) {
// add the pending ack to the draining hashes tracker if the hash is in the range
drainingHashesTracker.addEntry(c, stickyKeyHash);
if (updatedHashRanges.containsStickyKey(stickyKeyHash)) {
switch (opType) {
//reduce ref count in case the stickyKeyHash was re-assigned to the original consumer
case ADD -> {
var entry = drainingHashesTracker.getEntry(stickyKeyHash);
if (entry != null && entry.getConsumer() == c) {
drainingHashesTracker.reduceRefCount(c, stickyKeyHash, false);
}
}
// add the pending ack to the draining hashes tracker if the hash is in the range
case REMOVE -> drainingHashesTracker.addEntry(c, stickyKeyHash);
}
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
import org.apache.pulsar.client.api.Range;
import org.assertj.core.data.Offset;
import org.mockito.Mockito;
Expand All @@ -47,7 +46,7 @@
public class ConsistentHashingStickyKeyConsumerSelectorTest {

@Test
public void testConsumerSelect() throws ConsumerAssignException {
public void testConsumerSelect() {

ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(200);
String key1 = "anyKey";
Expand Down Expand Up @@ -151,7 +150,7 @@ public void testConsumerSelect() throws ConsumerAssignException {


@Test
public void testGetConsumerKeyHashRanges() throws BrokerServiceException.ConsumerAssignException {
public void testGetConsumerKeyHashRanges() {
ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(3);
List<String> consumerName = Arrays.asList("consumer1", "consumer2", "consumer3");
List<Consumer> consumers = new ArrayList<>();
Expand Down Expand Up @@ -201,8 +200,7 @@ public void testGetConsumerKeyHashRanges() throws BrokerServiceException.Consume
}

@Test
public void testConsumersGetSufficientlyAccuratelyEvenlyMapped()
throws BrokerServiceException.ConsumerAssignException {
public void testConsumersGetSufficientlyAccuratelyEvenlyMapped() {
ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(200);
List<Consumer> consumers = new ArrayList<>();
for (int i = 0; i < 20; i++) {
Expand Down Expand Up @@ -530,11 +528,24 @@ public void testShouldNotSwapExistingConsumers() {
selector.removeConsumer(consumer);

ConsumerHashAssignmentsSnapshot assignmentsAfter = selector.getConsumerHashAssignmentsSnapshot();
assertThat(assignmentsBefore.resolveImpactedConsumers(assignmentsAfter).getRemovedHashRanges())
ImpactedConsumersResult impactedConsumersAfterRemoval = assignmentsBefore
.resolveImpactedConsumers(assignmentsAfter);
assertThat(impactedConsumersAfterRemoval.getRemovedHashRanges())
.describedAs(
"when a consumer is removed, the removed hash ranges should only be from "
+ "the removed consumer")
.containsOnlyKeys(consumer);
List<Range> allAddedRangesAfterRemoval = ConsumerHashAssignmentsSnapshot.mergeOverlappingRanges(
impactedConsumersAfterRemoval.getAddedHashRanges().values().stream()
.map(UpdatedHashRanges::asRanges).flatMap(List::stream)
.collect(Collectors.toCollection(TreeSet::new))
);
assertThat(allAddedRangesAfterRemoval)
.describedAs(
"when a consumer is removed, all its hash ranges should appear "
+ "in added hash ranges"
)
.containsExactlyElementsOf(assignmentsBefore.getRangesByConsumer().get(consumer));
assignmentsBefore = assignmentsAfter;

// add consumer back
Expand All @@ -543,8 +554,9 @@ public void testShouldNotSwapExistingConsumers() {
assignmentsAfter = selector.getConsumerHashAssignmentsSnapshot();
List<Range> addedConsumerRanges = assignmentsAfter.getRangesByConsumer().get(consumer);

Map<Consumer, RemovedHashRanges> removedHashRanges =
assignmentsBefore.resolveImpactedConsumers(assignmentsAfter).getRemovedHashRanges();
ImpactedConsumersResult impactedConsumersAfterAdding = assignmentsBefore
.resolveImpactedConsumers(assignmentsAfter);
Map<Consumer, UpdatedHashRanges> removedHashRanges = impactedConsumersAfterAdding.getRemovedHashRanges();
ConsumerHashAssignmentsSnapshot finalAssignmentsBefore = assignmentsBefore;
assertThat(removedHashRanges).allSatisfy((c, removedHashRange) -> {
assertThat(removedHashRange
Expand All @@ -558,12 +570,19 @@ public void testShouldNotSwapExistingConsumers() {

List<Range> allRemovedRanges =
ConsumerHashAssignmentsSnapshot.mergeOverlappingRanges(
removedHashRanges.entrySet().stream().map(Map.Entry::getValue)
.map(RemovedHashRanges::asRanges)
removedHashRanges.values().stream()
.map(UpdatedHashRanges::asRanges)
.flatMap(List::stream).collect(Collectors.toCollection(TreeSet::new)));
assertThat(allRemovedRanges)
.describedAs("all removed ranges should be the same as the ranges of the added consumer")
.containsExactlyElementsOf(addedConsumerRanges);
List<Range> allAddedRangesAfterAdding = ConsumerHashAssignmentsSnapshot.mergeOverlappingRanges(
impactedConsumersAfterAdding.getAddedHashRanges().values().stream()
.map(UpdatedHashRanges::asRanges)
.flatMap(List::stream).collect(Collectors.toCollection(TreeSet::new)));
assertThat(addedConsumerRanges)
.describedAs("all added ranges should be the same as the ranges of the added consumer")
.containsExactlyElementsOf(allAddedRangesAfterAdding);

assignmentsBefore = assignmentsAfter;
}
Expand Down
Loading
Loading