Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
34932e2
[WIP] KAFKA-18376: High CPU load when AsyncKafkaConsumer uses a small…
kirktrue Sep 10, 2025
b5d7d01
[WIP] More work on correctness
kirktrue Sep 11, 2025
d4802c7
Re-enabling tests in AsyncKafkaConsumer
kirktrue Sep 11, 2025
9fd7e58
Merge branch 'trunk' into KAFKA-18376-chain-events-in-background-thread
kirktrue Sep 15, 2025
d3fa910
Minor clean up from design review
kirktrue Sep 15, 2025
dbc4773
Updates to fix inverted logic in maybeInterruptCompositePoll()
kirktrue Sep 15, 2025
09f8cb5
Add documentation for RequiresApplicationThreadExecution
kirktrue Sep 15, 2025
5e794ce
Inject NetworkClientDelegate via supplier for ApplicationEventProcess…
kirktrue Sep 15, 2025
464d5ba
Removed the verbose logging
kirktrue Sep 16, 2025
d253b84
Work in progress to get past most of the integration test issues
kirktrue Sep 17, 2025
aaefbef
Clean up logic related to metadata errors that can happen along any s…
kirktrue Sep 17, 2025
40f6754
Minor updates for CompletableEventReaper logging
kirktrue Sep 17, 2025
3e0b920
Refactor CompositePollEvent to use Blocker for state management
kirktrue Sep 18, 2025
529aab3
Update AsyncKafkaConsumer.java
kirktrue Sep 18, 2025
784aad2
Moving toward a non-blocking poll() implementation
kirktrue Sep 19, 2025
c6a7923
Merge branch 'trunk' into KAFKA-18376-chain-events-in-background-thread
kirktrue Sep 19, 2025
524782c
Merge branch 'trunk' into KAFKA-18376-chain-events-in-background-thre…
kirktrue Sep 19, 2025
00f9069
Merge branch 'KAFKA-18376-chain-events-in-background-thread' into KAF…
kirktrue Sep 19, 2025
0ac19f9
Clean up
kirktrue Sep 19, 2025
ae0ddcc
Add completion tracking to CompositePollEvent
kirktrue Sep 20, 2025
6775aac
Refactor poll event handling and metadata error propagation
kirktrue Sep 20, 2025
2c3547e
Inject NetworkClientDelegate into ApplicationEventProcessor
kirktrue Sep 20, 2025
18f4fa1
Remove BackgroundEventHandler from OffsetsRequestManager
kirktrue Sep 20, 2025
ea99a13
Handle immediate metadata errors for CompletableEvents
kirktrue Sep 20, 2025
56062f5
Update NetworkClientDelegate.java
kirktrue Sep 20, 2025
9d65fa2
Merge pull request #10 from kirktrue/KAFKA-18376-chain-events-in-back…
kirktrue Sep 20, 2025
99304db
Remove extra whitespace in NetworkClientDelegate
kirktrue Sep 20, 2025
702b257
Revert removal of contains() from CompletableEventReaper
kirktrue Sep 20, 2025
81b707e
Update NetworkClientDelegate.java
kirktrue Sep 20, 2025
8159884
Refactor application thread requirement handling
kirktrue Sep 20, 2025
7112022
Update AsyncKafkaConsumer.java
kirktrue Sep 20, 2025
f40a4ac
Refactor consumer record polling in tests
kirktrue Sep 23, 2025
abaa4dc
Reset backoff on event submission in AsyncKafkaConsumer
kirktrue Sep 23, 2025
1570f69
Handle exceptions in AsyncKafkaConsumer poll event
kirktrue Sep 23, 2025
1e52282
Merge branch 'trunk' into KAFKA-18376-chain-events-in-background-thread
kirktrue Sep 23, 2025
2d21fa0
Refactor poll event handling and metadata error management
kirktrue Sep 23, 2025
b193770
Refactor CompositePollEventInvoker to standalone class
kirktrue Sep 23, 2025
2302427
Remove debug logging from CompositePollEventInvoker
kirktrue Sep 24, 2025
91e881f
Fix typo in CompositePollPseudoEvent class name
kirktrue Sep 24, 2025
8b33f08
Refactor lambda in waitForConsumerPollException call
kirktrue Sep 24, 2025
2aaca8d
Move pollForRecords helper method in KafkaConsumerTest
kirktrue Sep 24, 2025
1f1ae24
Remove commented-out event processing code
kirktrue Sep 24, 2025
985bbd7
Improve consumer poll reliability in integration tests
kirktrue Sep 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.events.CompositePollEvent;
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
import org.apache.kafka.clients.consumer.internals.events.CreateFetchRequestsEvent;
Expand All @@ -59,7 +60,6 @@
import org.apache.kafka.clients.consumer.internals.events.LeaveGroupOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.PausePartitionsEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
import org.apache.kafka.clients.consumer.internals.events.ResumePartitionsEvent;
import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
Expand Down Expand Up @@ -174,6 +174,31 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {

private static final long NO_CURRENT_THREAD = -1L;

private static class AsyncConsumerApplicationThreadRequirement implements ApplicationEventProcessor.CompositePollApplicationThreadRequirement {

private final BackgroundEventHandler backgroundEventHandler;
private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;

public AsyncConsumerApplicationThreadRequirement(BackgroundEventHandler backgroundEventHandler,
OffsetCommitCallbackInvoker offsetCommitCallbackInvoker) {
this.backgroundEventHandler = backgroundEventHandler;
this.offsetCommitCallbackInvoker = offsetCommitCallbackInvoker;
}

@Override
public Optional<CompositePollEvent.State> requirement() {
// If there are background events to process, exit to the application thread.
if (backgroundEventHandler.size() > 0)
return Optional.of(CompositePollEvent.State.BACKGROUND_EVENT_PROCESSING_REQUIRED);

// If there are enqueued callbacks to invoke, exit to the application thread.
if (offsetCommitCallbackInvoker.size() > 0)
return Optional.of(CompositePollEvent.State.OFFSET_COMMIT_CALLBACKS_REQUIRED);

return Optional.empty();
}
}

/**
* An {@link org.apache.kafka.clients.consumer.internals.events.EventProcessor} that is created and executes in the
* application thread for the purpose of processing {@link BackgroundEvent background events} generated by the
Expand Down Expand Up @@ -325,8 +350,8 @@ private StreamsRebalanceListenerInvoker streamsRebalanceListenerInvoker() {
// Init value is needed to avoid NPE in case of exception raised in the constructor
private Optional<ClientTelemetryReporter> clientTelemetryReporter = Optional.empty();

// to keep from repeatedly scanning subscriptions in poll(), cache the result during metadata updates
private boolean cachedSubscriptionHasAllFetchPositions;
private final AsyncConsumerApplicationThreadRequirement asyncApplicationThreadRequirement;
private final CompositePollEventInvoker pollInvoker;
private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;
private final ConsumerRebalanceListenerInvoker rebalanceListenerInvoker;
Expand Down Expand Up @@ -461,15 +486,24 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
memberStateListener,
streamsRebalanceData
);
final CompletableEventReaper applicationEventReaper = new CompletableEventReaper(logContext);
this.asyncApplicationThreadRequirement = new AsyncConsumerApplicationThreadRequirement(
backgroundEventHandler,
offsetCommitCallbackInvoker
);
final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(logContext,
metadata,
subscriptions,
requestManagersSupplier);
requestManagersSupplier,
networkClientDelegateSupplier,
asyncApplicationThreadRequirement,
applicationEventReaper
);
this.applicationEventHandler = applicationEventHandlerFactory.build(
logContext,
time,
applicationEventQueue,
new CompletableEventReaper(logContext),
applicationEventReaper,
applicationEventProcessorSupplier,
networkClientDelegateSupplier,
requestManagersSupplier,
Expand All @@ -485,6 +519,13 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
new StreamsRebalanceListenerInvoker(logContext, s));
this.backgroundEventProcessor = new BackgroundEventProcessor();
this.backgroundEventReaper = backgroundEventReaperFactory.build(logContext);
this.pollInvoker = new CompositePollEventInvoker(
logContext,
time,
applicationEventHandler,
this::processBackgroundEvents,
offsetCommitCallbackInvoker::executeCallbacks
);

// The FetchCollector is only used on the application thread.
this.fetchCollector = fetchCollectorFactory.build(logContext,
Expand Down Expand Up @@ -559,11 +600,22 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
this.clientTelemetryReporter = Optional.empty();
this.autoCommitEnabled = autoCommitEnabled;
this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors);
this.pollInvoker = new CompositePollEventInvoker(
logContext,
time,
applicationEventHandler,
this::processBackgroundEvents,
offsetCommitCallbackInvoker::executeCallbacks
);
this.backgroundEventHandler = new BackgroundEventHandler(
backgroundEventQueue,
time,
asyncConsumerMetrics
);
this.asyncApplicationThreadRequirement = new AsyncConsumerApplicationThreadRequirement(
backgroundEventHandler,
offsetCommitCallbackInvoker
);
}

AsyncKafkaConsumer(LogContext logContext,
Expand Down Expand Up @@ -623,7 +675,7 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
new RebalanceCallbackMetricsManager(metrics)
);
ApiVersions apiVersions = new ApiVersions();
Supplier<NetworkClientDelegate> networkClientDelegateSupplier = () -> new NetworkClientDelegate(
Supplier<NetworkClientDelegate> networkClientDelegateSupplier = NetworkClientDelegate.supplier(
time,
config,
logContext,
Expand Down Expand Up @@ -652,23 +704,38 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
memberStateListener,
Optional.empty()
);
final CompletableEventReaper applicationEventReaper = new CompletableEventReaper(logContext);
this.asyncApplicationThreadRequirement = new AsyncConsumerApplicationThreadRequirement(
backgroundEventHandler,
offsetCommitCallbackInvoker
);
Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(
logContext,
metadata,
subscriptions,
requestManagersSupplier
requestManagersSupplier,
networkClientDelegateSupplier,
asyncApplicationThreadRequirement,
applicationEventReaper
);
this.applicationEventHandler = new ApplicationEventHandler(logContext,
time,
applicationEventQueue,
new CompletableEventReaper(logContext),
applicationEventReaper,
applicationEventProcessorSupplier,
networkClientDelegateSupplier,
requestManagersSupplier,
asyncConsumerMetrics);
this.streamsRebalanceListenerInvoker = Optional.empty();
this.backgroundEventProcessor = new BackgroundEventProcessor();
this.backgroundEventReaper = new CompletableEventReaper(logContext);
this.pollInvoker = new CompositePollEventInvoker(
logContext,
time,
applicationEventHandler,
this::processBackgroundEvents,
offsetCommitCallbackInvoker::executeCallbacks
);
}

// auxiliary interface for testing
Expand Down Expand Up @@ -833,22 +900,13 @@ public ConsumerRecords<K, V> poll(final Duration timeout) {
}

do {
PollEvent event = new PollEvent(timer.currentTimeMs());
// Make sure to let the background thread know that we are still polling.
// This will trigger async auto-commits of consumed positions when hitting
// the interval time or reconciling new assignments
applicationEventHandler.add(event);
// Wait for reconciliation and auto-commit to be triggered, to ensure all commit requests
// retrieve the positions to commit before proceeding with fetching new records
ConsumerUtils.getResult(event.reconcileAndAutoCommit(), defaultApiTimeoutMs.toMillis());

// We must not allow wake-ups between polling for fetches and returning the records.
// If the polled fetches are not empty the consumed position has already been updated in the polling
// of the fetches. A wakeup between returned fetches and returning records would lead to never
// returning the records in the fetches. Thus, we trigger a possible wake-up before we poll fetches.
wakeupTrigger.maybeTriggerWakeup();

updateAssignmentMetadataIfNeeded(timer);
pollInvoker.poll(timer);
final Fetch<K, V> fetch = pollForFetches(timer);
if (!fetch.isEmpty()) {
// before returning the fetched records, we can send off the next round of fetches
Expand Down Expand Up @@ -1771,18 +1829,6 @@ private Fetch<K, V> pollForFetches(Timer timer) {
return fetch;
}

// send any new fetches (won't resend pending fetches)
sendFetches(timer);

// We do not want to be stuck blocking in poll if we are missing some positions
// since the offset lookup may be backing off after a failure

// NOTE: the use of cachedSubscriptionHasAllFetchPositions means we MUST call
// updateAssignmentMetadataIfNeeded before this method.
if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > retryBackoffMs) {
pollTimeout = retryBackoffMs;
}

log.trace("Polling for fetches with timeout {}", pollTimeout);

Timer pollTimer = time.timer(pollTimeout);
Expand Down Expand Up @@ -1834,11 +1880,10 @@ private Fetch<K, V> collectFetch() {
* defined
*/
private boolean updateFetchPositions(final Timer timer) {
cachedSubscriptionHasAllFetchPositions = false;
try {
CheckAndUpdatePositionsEvent checkAndUpdatePositionsEvent = new CheckAndUpdatePositionsEvent(calculateDeadlineMs(timer));
wakeupTrigger.setActiveTask(checkAndUpdatePositionsEvent.future());
cachedSubscriptionHasAllFetchPositions = applicationEventHandler.addAndGet(checkAndUpdatePositionsEvent);
applicationEventHandler.addAndGet(checkAndUpdatePositionsEvent);
} catch (TimeoutException e) {
return false;
} finally {
Expand All @@ -1856,41 +1901,6 @@ private boolean isCommittedOffsetsManagementEnabled() {
return groupMetadata.get().isPresent();
}

/**
* This method signals the background thread to {@link CreateFetchRequestsEvent create fetch requests}.
*
* <p/>
*
* This method takes the following steps to maintain compatibility with the {@link ClassicKafkaConsumer} method
* of the same name:
*
* <ul>
* <li>
* The method will wait for confirmation of the request creation before continuing.
* </li>
* <li>
* The method will throw exceptions encountered during request creation to the user <b>immediately</b>.
* </li>
* <li>
* The method will suppress {@link TimeoutException}s that occur while waiting for the confirmation.
* Timeouts during request creation are a byproduct of this consumer's thread communication mechanisms.
* That exception type isn't thrown in the request creation step of the {@link ClassicKafkaConsumer}.
* Additionally, timeouts will not impact the logic of {@link #pollForFetches(Timer) blocking requests}
* as it can handle requests that are created after the timeout.
* </li>
* </ul>
*
* @param timer Timer used to bound how long the consumer waits for the requests to be created, which in practice
* is used to avoid using {@link Long#MAX_VALUE} to wait "forever"
*/
private void sendFetches(Timer timer) {
try {
applicationEventHandler.addAndGet(new CreateFetchRequestsEvent(calculateDeadlineMs(timer)));
} catch (TimeoutException swallow) {
// Can be ignored, per above comments.
}
}

/**
* This method signals the background thread to {@link CreateFetchRequestsEvent create fetch requests} for the
* pre-fetch case, i.e. right before {@link #poll(Duration)} exits. In the pre-fetch case, the application thread
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
import org.apache.kafka.clients.consumer.internals.events.CompositePollEvent;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;

import org.slf4j.Logger;

import static org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs;

public class CompositePollEventInvoker {

private final Logger log;
private final Time time;
private final ApplicationEventHandler applicationEventHandler;
private final Runnable backgroundEventProcessor;
private final Runnable offsetCommitProcessor;
private CompositePollEvent inflight;

public CompositePollEventInvoker(LogContext logContext,
Time time,
ApplicationEventHandler applicationEventHandler,
Runnable backgroundEventProcessor,
Runnable offsetCommitProcessor) {
this.log = logContext.logger(getClass());
this.time = time;
this.applicationEventHandler = applicationEventHandler;
this.backgroundEventProcessor = backgroundEventProcessor;
this.offsetCommitProcessor = offsetCommitProcessor;
}

public void poll(Timer timer) {
if (inflight == null) {
log.debug("No existing inflight event, submitting");
submitEvent(ApplicationEvent.Type.POLL, timer);
}

try {
if (log.isTraceEnabled()) {
log.trace(
"Attempting to retrieve result from previously submitted {} with {} remaining on timer",
inflight,
timer.remainingMs()
);
}

CompositePollEvent.Result result = inflight.resultOrError();
CompositePollEvent.State state = result.state();

if (state == CompositePollEvent.State.COMPLETE) {
// Make sure to clear out the inflight request since it's complete.
log.debug("Event {} completed, clearing inflight", inflight);
inflight = null;
} else if (state == CompositePollEvent.State.BACKGROUND_EVENT_PROCESSING_REQUIRED) {
log.debug("About to process background events");
backgroundEventProcessor.run();
log.debug("Done processing background events");
result.nextEventType().ifPresent(t -> submitEvent(t, timer));
} else if (state == CompositePollEvent.State.OFFSET_COMMIT_CALLBACKS_REQUIRED) {
log.debug("About to process offset commits");
offsetCommitProcessor.run();
log.debug("Done processing offset commits");
result.nextEventType().ifPresent(t -> submitEvent(t, timer));
Comment on lines +79 to +83
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this bit is also one I wonder if needed, or if we can just trigger the commit callbacks direclty from the app thread poll loop (offsetCommitCallbackInvoker.executeCallbacks()) . If that works, no need for this extra state or all the related offsetCommitProcessor logic

} else if (state == CompositePollEvent.State.UNKNOWN) {
throw new KafkaException("Unexpected poll result received");
}
} catch (Throwable t) {
// If an exception is hit, bubble it up to the user but make sure to clear out the inflight request
// because the error effectively renders it complete.
log.debug("Event {} \"completed\" via error ({}), clearing inflight", inflight, String.valueOf(t));
inflight = null;
throw ConsumerUtils.maybeWrapAsKafkaException(t);
}
}

private void submitEvent(ApplicationEvent.Type type, Timer timer) {
long deadlineMs = calculateDeadlineMs(timer);
inflight = new CompositePollEvent(deadlineMs, time.milliseconds(), type);
applicationEventHandler.add(inflight);
log.debug("Submitted new {} with {} remaining on timer", inflight, timer.remainingMs());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ protected void maybeThrowAuthFailure(Node node) {
* @return Future on which the caller can wait to ensure that the requests have been created
*/
public CompletableFuture<Void> createFetchRequests() {
if (!fetchBuffer.isEmpty())
return CompletableFuture.completedFuture(null);

CompletableFuture<Void> future = new CompletableFuture<>();

if (pendingFetchRequestFuture != null) {
Expand Down
Loading
Loading