Skip to content

Commit 84aa033

Browse files
committed
feat: implement AckMode RECORD_FILTERED
Signed-off-by: Chaedong Im <chaedong.im.dev@gmail.com>
1 parent 8640e72 commit 84aa033

File tree

3 files changed

+99
-8
lines changed

3 files changed

+99
-8
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@
111111
import org.springframework.kafka.listener.ContainerProperties.AssignmentCommitOption;
112112
import org.springframework.kafka.listener.ContainerProperties.EOSMode;
113113
import org.springframework.kafka.listener.adapter.AsyncRepliesAware;
114+
import org.springframework.kafka.listener.adapter.FilteringAware;
114115
import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter;
115116
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
116117
import org.springframework.kafka.support.Acknowledgment;
@@ -172,6 +173,7 @@
172173
* @author Christian Fredriksson
173174
* @author Timofey Barabanov
174175
* @author Janek Lasocki-Biczysko
176+
* @author Chaedong Im
175177
*/
176178
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
177179
extends AbstractMessageListenerContainer<K, V> implements ConsumerPauseResumeEventPublisher {
@@ -677,6 +679,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
677679

678680
private final boolean isRecordAck;
679681

682+
private final boolean isRecordFilteredAck;
683+
680684
private final BlockingQueue<ConsumerRecord<K, V>> acks = new LinkedBlockingQueue<>();
681685

682686
private final BlockingQueue<TopicPartitionOffset> seeks = new LinkedBlockingQueue<>();
@@ -871,6 +875,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
871875
this.isManualImmediateAck = AckMode.MANUAL_IMMEDIATE.equals(this.ackMode);
872876
this.isAnyManualAck = this.isManualAck || this.isManualImmediateAck;
873877
this.isRecordAck = this.ackMode.equals(AckMode.RECORD);
878+
this.isRecordFilteredAck = this.ackMode.equals(AckMode.RECORD_FILTERED);
874879
boolean isOutOfCommit = this.isAnyManualAck && this.asyncReplies;
875880
this.offsetsInThisBatch = isOutOfCommit ? new ConcurrentHashMap<>() : null;
876881
this.deferredOffsets = isOutOfCommit ? new ConcurrentHashMap<>() : null;
@@ -933,8 +938,8 @@ else if (listener instanceof MessageListener) {
933938
this.isConsumerAwareListener = listenerType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE)
934939
|| listenerType.equals(ListenerType.CONSUMER_AWARE);
935940
this.commonErrorHandler = determineCommonErrorHandler();
936-
Assert.state(!this.isBatchListener || !this.isRecordAck,
937-
"Cannot use AckMode.RECORD with a batch listener");
941+
Assert.state(!this.isBatchListener || (!this.isRecordAck && !this.isRecordFilteredAck),
942+
"Cannot use AckMode.RECORD or AckMode.RECORD_FILTERED with a batch listener");
938943
if (this.containerProperties.getScheduler() != null) {
939944
this.taskScheduler = this.containerProperties.getScheduler();
940945
this.taskSchedulerExplicitlySet = true;
@@ -1510,7 +1515,7 @@ protected void handleAsyncFailure() {
15101515
}
15111516

15121517
private void doProcessCommits() {
1513-
if (!this.autoCommit && !this.isRecordAck) {
1518+
if (!this.autoCommit && !this.isRecordAck && !this.isRecordFilteredAck) {
15141519
try {
15151520
processCommits();
15161521
}
@@ -2260,7 +2265,7 @@ private List<ConsumerRecord<K, V>> createRecordList(final ConsumerRecords<K, V>
22602265
}
22612266
getAfterRollbackProcessor().clearThreadState();
22622267
}
2263-
if (!this.autoCommit && !this.isRecordAck) {
2268+
if (!this.autoCommit && !this.isRecordAck && !this.isRecordFilteredAck) {
22642269
processCommits();
22652270
}
22662271
}
@@ -2710,7 +2715,7 @@ private void listenerInfo(final ConsumerRecord<K, V> cRecord) {
27102715
}
27112716

27122717
private void handleNack(final ConsumerRecords<K, V> records, final ConsumerRecord<K, V> cRecord) {
2713-
if (!this.autoCommit && !this.isRecordAck) {
2718+
if (!this.autoCommit && !this.isRecordAck && !this.isRecordFilteredAck) {
27142719
processCommits();
27152720
}
27162721
List<ConsumerRecord<?, ?>> list = new ArrayList<>();
@@ -3060,12 +3065,26 @@ public void checkDeser(final ConsumerRecord<K, V> cRecord, String headerName) {
30603065
}
30613066
}
30623067

3068+
private boolean isRecordFiltered(ConsumerRecord<K, V> cRecord) {
3069+
Object listener = KafkaMessageListenerContainer.this.getContainerProperties().getMessageListener();
3070+
if (listener instanceof FilteringAware<?, ?>) {
3071+
@SuppressWarnings("unchecked")
3072+
FilteringAware<K, V> filteringAware = (FilteringAware<K, V>) listener;
3073+
return filteringAware.wasFiltered(cRecord);
3074+
}
3075+
return false;
3076+
}
3077+
30633078
public void ackCurrent(final ConsumerRecord<K, V> cRecord) {
30643079
ackCurrent(cRecord, false);
30653080
}
30663081

30673082
public void ackCurrent(final ConsumerRecord<K, V> cRecord, boolean commitRecovered) {
3068-
if (this.isRecordAck && this.producer == null) {
3083+
if (this.isRecordFilteredAck && isRecordFiltered(cRecord)) {
3084+
return;
3085+
}
3086+
3087+
if ((this.isRecordAck || this.isRecordFilteredAck) && this.producer == null) {
30693088
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = buildSingleCommits(cRecord);
30703089
this.commitLogger.log(() -> COMMITTING + offsetsToCommit);
30713090
commitOffsets(offsetsToCommit);
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright 2024-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener.adapter;
18+
19+
import org.apache.kafka.clients.consumer.ConsumerRecord;
20+
21+
/**
22+
* An interface to indicate that a message listener adapter can report
23+
* whether a record was filtered during processing.
24+
*
25+
* @param <K> the key type.
26+
* @param <V> the value type.
27+
*
28+
* @author Chaedong Im
29+
* @since 4.0
30+
*/
31+
public interface FilteringAware<K, V> {
32+
33+
/**
34+
* Check if the most recent record processed was filtered out.
35+
* This method should be called after a record has been processed
36+
* to determine if the record was filtered and should not trigger
37+
* an offset commit in RECORD_FILTERED acknowledge mode.
38+
* @param record the record to check
39+
* @return true if the record was filtered, false if it was processed
40+
*/
41+
boolean wasFiltered(ConsumerRecord<K, V> record);
42+
43+
}

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringMessageListenerAdapter.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.kafka.listener.adapter;
1818

19+
import java.util.concurrent.atomic.AtomicReference;
20+
1921
import org.apache.kafka.clients.consumer.Consumer;
2022
import org.apache.kafka.clients.consumer.ConsumerRecord;
2123
import org.jspecify.annotations.Nullable;
@@ -32,14 +34,30 @@
3234
* @param <V> the value type.
3335
*
3436
* @author Gary Russell
37+
* @author Chaedong Im
3538
*
3639
*/
3740
public class FilteringMessageListenerAdapter<K, V>
3841
extends AbstractFilteringMessageListener<K, V, MessageListener<K, V>>
39-
implements AcknowledgingConsumerAwareMessageListener<K, V> {
42+
implements AcknowledgingConsumerAwareMessageListener<K, V>, FilteringAware<K, V> {
43+
44+
private static class FilterResult<K, V> {
45+
46+
final ConsumerRecord<K, V> record;
47+
48+
final boolean wasFiltered;
49+
50+
FilterResult(ConsumerRecord<K, V> record, boolean wasFiltered) {
51+
this.record = record;
52+
this.wasFiltered = wasFiltered;
53+
}
54+
55+
}
4056

4157
private final boolean ackDiscarded;
4258

59+
private final AtomicReference<@Nullable FilterResult<K, V>> lastResult = new AtomicReference<>();
60+
4361
/**
4462
* Create an instance with the supplied strategy and delegate listener.
4563
* @param delegate the delegate.
@@ -68,7 +86,12 @@ public FilteringMessageListenerAdapter(MessageListener<K, V> delegate,
6886
public void onMessage(ConsumerRecord<K, V> consumerRecord, @Nullable Acknowledgment acknowledgment,
6987
@Nullable Consumer<?, ?> consumer) {
7088

71-
if (!filter(consumerRecord)) {
89+
boolean filtered = filter(consumerRecord);
90+
91+
// Atomically update both the record and its filtered state together
92+
this.lastResult.set(new FilterResult<>(consumerRecord, filtered));
93+
94+
if (!filtered) {
7295
switch (this.delegateType) {
7396
case ACKNOWLEDGING_CONSUMER_AWARE -> this.delegate.onMessage(consumerRecord, acknowledgment, consumer);
7497
case ACKNOWLEDGING -> this.delegate.onMessage(consumerRecord, acknowledgment);
@@ -93,6 +116,12 @@ private void ackFilteredIfNecessary(@Nullable Acknowledgment acknowledgment) {
93116
}
94117
}
95118

119+
@Override
120+
public boolean wasFiltered(ConsumerRecord<K, V> record) {
121+
FilterResult<K, V> result = this.lastResult.get();
122+
return result != null && result.record == record && result.wasFiltered;
123+
}
124+
96125
/*
97126
* Since the container uses the delegate's type to determine which method to call, we
98127
* must implement them all.

0 commit comments

Comments
 (0)