Skip to content

Conversation

@jeffxiang
Copy link
Contributor

@jeffxiang jeffxiang commented Sep 24, 2025

Allow MemQ producers to maintain a sticky set of N brokers for round-robin writes. This probabilistically reduces the variance in broker throughput vs. just one sticky connection.

The idea is to maintain the favorable properties of the current endpoint selection algorithm but expand it to accommodate N endpoints in rotation. The favorable properties we want to maintain are:

  • Stickiness: Once working endpoints are chosen, we want to keep using them. This reduces connection counts.
  • Randomness: Each AZ-local endpoint should have equal probability of being chosen. This ensures even distribution across restarts.
  • Greediness: We should choose the first N endpoints that are successful. This simplifies the selection logic and maintenance overhead.

To accommodate N endpoints in rotation, we only slightly modify the original endpoint selection algorithm so that we replace the singleton currentEndpoint and instead keep a set of writeEndpoints which has a maximum size of numWriteEndpoints. The modified algorithm looks similar to the existing algorithm:

Discover topic metadata → Set<Endpoint> broker endpoints which host the topic
Filter through only the AZ-local endpoints + shuffle in random order
While (writeEndpoints.size() < numWriteEndpoints):
  New write → try the first endpoint in AZ-local endpoints
    If success, add it to writeEndpoints
    If failed, go down the list until one succeeds and add it to writeEndpoints
    Rotate AZ-local endpoints by 1 index
Now that writeEndpoints.size() == numWriteEndpoints, simply rotate writeEndpoints by 1 for every future write and try the first endpoint
If an endpoint is unreachable / failed, remove it from writeEndpoints and repeat step 3

Changes are made to the following core classes:

  • MemqCommonClient: modified endpoint selection algorithm
  • NetworkClient: Introduce a Map<InetSocketAddress, ChannelFuture> channelPool to maintain a set of open channels for each endpoint
  • ResponseHandler: Introduce 2 maps: channelToRequests and requestsToChannel to track and maintain inflight requests for multiple channels

Testing:

  • Unit test coverage for changes in core classes
  • Functional testing in prod-like environments, including chaos scenarios such as multiple dead brokers, rolling restarts/replacements, etc.

@jeffxiang jeffxiang marked this pull request as ready for review October 27, 2025 15:01
@jeffxiang jeffxiang requested a review from a team as a code owner October 27, 2025 15:01
Copy link

@vahidhashemian vahidhashemian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Left a few comments. Also, would be great to add:

  • Javadoc for the new code/methods/configs
  • metrics where it makes sense

deprioritizeDeadEndpoint(endpoint, topic); // this endpoint is down even after retries in NetworkClient, remove it from the write endpoints and take another one from locality endpoints
} catch (Exception ex) {
logger.error("Failed to refresh write endpoints", ex);
throw e;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it throw ex too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're throwing e instead of ex because ex is a secondary error thrown by deprioritizeDeadEndpoint(). the main issue is the ConnectException.

Comment on lines 365 to 370
if (currentWrites.size() < numWriteEndpoints && !currentWrites.contains(endpoint)) {
logger.info("Registering write endpoint: " + endpoint + " for topic: " + topic);
writeEndpoints.add(endpoint);
List<Endpoint> newWrites = new ArrayList<>(currentWrites);
newWrites.add(endpoint);
this.writeEndpoints = Collections.unmodifiableList(newWrites);
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it still possible for two threads to overwrite each other in this if block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hypothetically yes, but the caller of MemqCommonClient is the Request.Dispatch thread which is single-threaded: https://github.com/pinterest/memq/blob/main/memq-client/src/main/java/com/pinterest/memq/client/producer2/RequestManager.java#L84

@jeffxiang jeffxiang merged commit a1fb975 into main Nov 3, 2025
1 check passed
@jeffxiang jeffxiang deleted the producer_balancing branch November 3, 2025 17:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants