Skip to content

Conversation

kirktrue
Copy link
Contributor

Introduces CompositePollEvent and CompositePollResult to streamline the poll event handling in AsyncKafkaConsumer and ApplicationEventProcessor. The new approach enables multi-step polling logic, where possible.

… max poll value

Introduces CompositePollEvent and CompositePollResult to refactor and streamline the poll event handling in AsyncKafkaConsumer and ApplicationEventProcessor. The new approach enables multi-step polling logic, improves callback and background event processing, and enhances testability. Also adds size methods to BackgroundEventHandler and OffsetCommitCallbackInvoker, disables several tests, and updates related classes to support the new event flow.
@github-actions github-actions bot added triage PRs from the community consumer clients labels Sep 10, 2025
Added a Javadoc comment to the RequiresApplicationThreadExecution interface to clarify its purpose and usage, specifically regarding the need to interrupt CompositePollEvent processing when requiresApplicationThread() returns true.
…or to check metadata errors

Refactors AsyncKafkaConsumer, ShareConsumerImpl, and ApplicationEventProcessor to inject NetworkClientDelegate using a supplier method. Adds a static supplier factory to NetworkClientDelegate for deferred instantiation. Updates related tests and construction logic to support the new dependency injection approach.
Copy link

A label of 'needs-attention' was automatically added to this PR in order to raise the
attention of the committers. Once this issue has been triaged, the triage label
should be removed to prevent this automation from happening again.

Replaces CompletableFuture-based state handling in CompositePollEvent with a new Blocker class for improved synchronization and exception handling. Updates AsyncKafkaConsumer, WakeupTrigger, ApplicationEventProcessor, and related tests to use Blocker, simplifying event completion and error propagation.
…KA-18376-chain-events-in-background-thread-without-addAndGet
Introduces an AtomicBoolean to track completion state in CompositePollEvent and updates ApplicationEventProcessor to mark events as complete when appropriate. Refactors AsyncKafkaConsumer to use a new CompositePollEventInvoker for polling, replacing prepareFetch, and implements exponential backoff for incomplete events.
Refactored AsyncKafkaConsumer and related classes to improve composite poll event handling, including explicit state management and pausing for background event processing or offset commit callbacks. Metadata errors are now optionally propagated via a dedicated error field in NetworkClientDelegate, allowing for more flexible error handling. Updated tests and logging to reflect these changes.
Adds NetworkClientDelegate as a dependency to ApplicationEventProcessor and updates AsyncKafkaConsumer and ShareConsumerImpl to supply it. Introduces error handling in composite poll processing using metadata errors from NetworkClientDelegate. Updates related tests to mock the new dependency.
Eliminated the BackgroundEventHandler parameter from OffsetsRequestManager and its usages in RequestManagers and related tests. This simplifies the constructor and removes unnecessary dependencies.
Added logic to check and fail CompletableEvents for metadata errors immediately upon processing, ensuring events that do not enter the awaiting state are handled correctly. Updated related tests to use consistent mocks and reduced poll durations for faster execution.
Cleaned up unnecessary whitespace and blank lines in NetworkClientDelegate.java to improve code readability.
Introduces AsyncConsumerApplicationThreadRequirement to encapsulate logic for determining when to pause event processing for application thread execution. Updates ApplicationEventProcessor and related classes to use a unified CompositePollApplicationThreadRequirement interface, simplifying constructor signatures and improving code clarity.
Copy link

A label of 'needs-attention' was automatically added to this PR in order to raise the
attention of the committers. Once this issue has been triaged, the triage label
should be removed to prevent this automation from happening again.

Replaces direct calls to consumer.poll with a new pollForRecords() helper method in multiple test cases. This improves code reuse and reliability by waiting for records to be available, and removes unnecessary suppress warnings and unchecked casts.
Sets backoff to -1 when submitting a new application event in AsyncKafkaConsumer. This ensures backoff state is reset for each event submission.
Wrap and propagate exceptions from resultOrError in AsyncKafkaConsumer, ensuring latest request is cleared when an error occurs. Also clear latest when poll event completes to properly track request lifecycle.
Simplifies AsyncKafkaConsumer's CompositePollEventInvoker by removing backoff logic and streamlining state handling. NetworkClientDelegate now uses AtomicReference for metadataError to improve thread safety. ApplicationEventProcessor refines error handling in composite poll events. Updates tests to reflect API changes and exception types.
@github-actions github-actions bot added the core Kafka Broker label Sep 23, 2025
Moved CompositePollEventInvoker from AsyncKafkaConsumer to its own file for better separation of concerns and testability. Updated AsyncKafkaConsumer to use the new class and refactored constructors accordingly. Enhanced related tests to use new helper methods for polling and exception handling, improving test clarity and reliability.
Eliminated a debug log statement that printed the result and state in CompositePollEventInvoker. This helps reduce unnecessary log output during normal operation.
Renamed CompositePollPsuedoEvent to CompositePollPseudoEvent in ApplicationEventProcessor to correct a spelling error and ensure consistency in class naming.
Replaces single-line lambda with block lambda for clarity in the waitForConsumerPollException call within AsyncKafkaConsumerTest. No functional changes; improves readability.
Relocated the pollForRecords() helper method from its previous position to after the testPollIdleRatio method for improved code organization in KafkaConsumerTest.
Deleted commented lines for processBackgroundEvents and offsetCommitCallbackInvoker.executeCallbacks in AsyncKafkaConsumer, cleaning up unused code.
Replaced direct assertions on consumer.poll() with TestUtils.waitUntilTrue in several integration tests to ensure expected results or exceptions are observed within a timeout. Also refactored CompositePollEventInvoker to rename 'latest' to 'inflight' for clarity and improved logging. These changes enhance test robustness and code readability.
Copy link

A label of 'needs-attention' was automatically added to this PR in order to raise the
attention of the committers. Once this issue has been triaged, the triage label
should be removed to prevent this automation from happening again.

Copy link
Member

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just two pointers from our sync earlier, thanks!

Comment on lines +253 to +260
if (nextEventType == ApplicationEvent.Type.UPDATE_SUBSCRIPTION_METADATA) {
log.debug("Processing {} logic for {}", nextEventType, event);
processUpdatePatternSubscriptionEvent();
nextEventType = ApplicationEvent.Type.CHECK_AND_UPDATE_POSITIONS;

if (maybeFailCompositePoll(event) || maybePauseCompositePoll(event, nextEventType))
return;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This maybe we don't really need it? given that we only need to ensure we re-eval regex vs metadata before sending topics to the broker, and we do have a hook for that already (onConsumerPoll). So what about removing this all together, and move just the processUpdatePatternSubscriptionEvent to onConsumerPoll

(renamed probably, it's not about events really, it's more of a ~maybeUpdatePatternSubscription)

Comment on lines +79 to +83
} else if (state == CompositePollEvent.State.OFFSET_COMMIT_CALLBACKS_REQUIRED) {
log.debug("About to process offset commits");
offsetCommitProcessor.run();
log.debug("Done processing offset commits");
result.nextEventType().ifPresent(t -> submitEvent(t, timer));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this bit is also one I wonder if needed, or if we can just trigger the commit callbacks direclty from the app thread poll loop (offsetCommitCallbackInvoker.executeCallbacks()) . If that works, no need for this extra state or all the related offsetCommitProcessor logic

@github-actions github-actions bot removed needs-attention triage PRs from the community labels Sep 25, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants