Skip to content

Commit 0cb722e

Browse files
authored
GH-1496: Handle RetriableCommitFailedException
Resolves #1496 Certain commit exceptions are retriable. **I will back-port as necessary** * Fix import
1 parent 96efd3f commit 0cb722e

File tree

4 files changed

+130
-9
lines changed

4 files changed

+130
-9
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerProperties.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ public class ConsumerProperties {
4444
*/
4545
public static final long DEFAULT_POLL_TIMEOUT = 5_000L;
4646

47+
private static final int DEFAULT_COMMIT_RETRIES = 3;
48+
4749
/**
4850
* Topic names.
4951
*/
@@ -99,6 +101,8 @@ public class ConsumerProperties {
99101

100102
private Duration authorizationExceptionRetryInterval;
101103

104+
private int commitRetries = DEFAULT_COMMIT_RETRIES;
105+
102106
/**
103107
* Create properties for a container that will subscribe to the specified topics.
104108
* @param topics the topics.
@@ -348,6 +352,28 @@ public void setAuthorizationExceptionRetryInterval(Duration authorizationExcepti
348352
this.authorizationExceptionRetryInterval = authorizationExceptionRetryInterval;
349353
}
350354

355+
/**
356+
* The number of retries allowed when a
357+
* {@link org.apache.kafka.clients.consumer.RetriableCommitFailedException} is thrown
358+
* by the consumer.
359+
* @return the number of retries.
360+
* @since 2.3.9
361+
*/
362+
public int getCommitRetries() {
363+
return this.commitRetries;
364+
}
365+
366+
/**
367+
* Set number of retries allowed when a
368+
* {@link org.apache.kafka.clients.consumer.RetriableCommitFailedException} is thrown
369+
* by the consumer. Default 3 (4 attempts total).
370+
* @param commitRetries the commitRetries.
371+
* @since 2.3.9
372+
*/
373+
public void setCommitRetries(int commitRetries) {
374+
this.commitRetries = commitRetries;
375+
}
376+
351377
@Override
352378
public String toString() {
353379
return "ConsumerProperties ["

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
5353
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
5454
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
55+
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
5556
import org.apache.kafka.clients.producer.Producer;
5657
import org.apache.kafka.common.Metric;
5758
import org.apache.kafka.common.MetricName;
@@ -60,6 +61,7 @@
6061
import org.apache.kafka.common.errors.FencedInstanceIdException;
6162
import org.apache.kafka.common.errors.ProducerFencedException;
6263
import org.apache.kafka.common.errors.RebalanceInProgressException;
64+
import org.apache.kafka.common.errors.RetriableException;
6365
import org.apache.kafka.common.errors.WakeupException;
6466
import org.apache.kafka.common.header.internals.RecordHeader;
6567

@@ -1236,6 +1238,10 @@ private void wrapUp() {
12361238
* @param e the exception.
12371239
*/
12381240
protected void handleConsumerException(Exception e) {
1241+
if (e instanceof RetriableCommitFailedException) {
1242+
this.logger.error(e, "Commit retries exhausted");
1243+
return;
1244+
}
12391245
try {
12401246
if (!this.isBatchListener && this.errorHandler != null) {
12411247
this.errorHandler.handle(e, Collections.emptyList(), this.consumer,
@@ -1318,10 +1324,21 @@ else if (this.syncCommits) {
13181324
commitSync(commits);
13191325
}
13201326
else {
1321-
this.consumer.commitAsync(commits, this.commitCallback);
1327+
commitAsync(commits, 0);
13221328
}
13231329
}
13241330

1331+
private void commitAsync(Map<TopicPartition, OffsetAndMetadata> commits, int retries) {
1332+
this.consumer.commitAsync(commits, (offsetsAttempted, exception) -> {
1333+
if (exception instanceof RetriableException && retries < this.containerProperties.getCommitRetries()) {
1334+
commitAsync(commits, retries + 1);
1335+
}
1336+
else {
1337+
this.commitCallback.onComplete(offsetsAttempted, exception);
1338+
}
1339+
});
1340+
}
1341+
13251342
private void invokeListener(final ConsumerRecords<K, V> records) {
13261343
if (this.isBatchListener) {
13271344
invokeBatchListener(records);
@@ -1903,7 +1920,7 @@ public void ackCurrent(final ConsumerRecord<K, V> record) {
19031920
commitSync(offsetsToCommit);
19041921
}
19051922
else {
1906-
this.consumer.commitAsync(offsetsToCommit, this.commitCallback);
1923+
commitAsync(offsetsToCommit, 0);
19071924
}
19081925
}
19091926
else {
@@ -2134,7 +2151,7 @@ private void commitIfNecessary() {
21342151
commitSync(commits);
21352152
}
21362153
else {
2137-
this.consumer.commitAsync(commits, this.commitCallback);
2154+
commitAsync(commits, 0);
21382155
}
21392156
}
21402157
catch (@SuppressWarnings(UNUSED) WakeupException e) {
@@ -2145,9 +2162,19 @@ private void commitIfNecessary() {
21452162
}
21462163

21472164
private void commitSync(Map<TopicPartition, OffsetAndMetadata> commits) {
2165+
doCommitSync(commits, 0);
2166+
}
2167+
2168+
private void doCommitSync(Map<TopicPartition, OffsetAndMetadata> commits, int retries) {
21482169
try {
21492170
this.consumer.commitSync(commits, this.syncCommitTimeout);
21502171
}
2172+
catch (RetriableCommitFailedException e) {
2173+
if (retries >= this.containerProperties.getCommitRetries()) {
2174+
throw e;
2175+
}
2176+
doCommitSync(commits, retries + 1);
2177+
}
21512178
catch (RebalanceInProgressException e) {
21522179
this.logger.debug(e, "Non-fatal commit failure");
21532180
this.commitsDuringRebalance.putAll(commits);
@@ -2459,13 +2486,12 @@ protected void doInTransactionWithoutResult(TransactionStatus status) {
24592486
ListenerConsumer.this.consumer.commitSync(offsetsToCommit,
24602487
containerProps.getSyncCommitTimeout());
24612488
}
2462-
catch (RebalanceInProgressException e) {
2489+
catch (RetriableCommitFailedException | RebalanceInProgressException e) {
24632490
// ignore since this is on assignment anyway
24642491
}
24652492
}
24662493
else {
2467-
ListenerConsumer.this.consumer.commitAsync(offsetsToCommit,
2468-
containerProps.getCommitCallback());
2494+
commitAsync(offsetsToCommit, 0);
24692495
}
24702496
}
24712497
}

spring-kafka/src/test/java/org/springframework/kafka/listener/DeadLetterPublishingRecovererTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import java.io.IOException;
3030
import java.io.ObjectOutputStream;
3131
import java.io.UncheckedIOException;
32-
import java.util.HashMap;
32+
import java.util.LinkedHashMap;
3333
import java.util.Map;
3434

3535
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -178,7 +178,7 @@ void tombstoneWithMultiTemplates() {
178178
KafkaOperations<?, ?> template1 = mock(KafkaOperations.class);
179179
given(template1.send(any(ProducerRecord.class))).willReturn(new SettableListenableFuture());
180180
KafkaOperations<?, ?> template2 = mock(KafkaOperations.class);
181-
Map<Class<?>, KafkaOperations<?, ?>> templates = new HashMap<>();
181+
Map<Class<?>, KafkaOperations<?, ?>> templates = new LinkedHashMap<>();
182182
templates.put(String.class, template1);
183183
templates.put(Integer.class, template2);
184184
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(templates);
@@ -193,7 +193,7 @@ void tombstoneWithMultiTemplatesExplicit() {
193193
KafkaOperations<?, ?> template1 = mock(KafkaOperations.class);
194194
KafkaOperations<?, ?> template2 = mock(KafkaOperations.class);
195195
given(template2.send(any(ProducerRecord.class))).willReturn(new SettableListenableFuture());
196-
Map<Class<?>, KafkaOperations<?, ?>> templates = new HashMap<>();
196+
Map<Class<?>, KafkaOperations<?, ?>> templates = new LinkedHashMap<>();
197197
templates.put(String.class, template1);
198198
templates.put(Void.class, template2);
199199
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(templates);

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@
6666
import org.apache.kafka.clients.consumer.ConsumerRecords;
6767
import org.apache.kafka.clients.consumer.KafkaConsumer;
6868
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
69+
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
70+
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
6971
import org.apache.kafka.clients.producer.ProducerConfig;
7072
import org.apache.kafka.common.TopicPartition;
7173
import org.apache.kafka.common.errors.AuthorizationException;
@@ -3011,6 +3013,73 @@ public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer,
30113013
container.stop();
30123014
}
30133015

3016+
@Test
3017+
void testCommitSyncRetries() throws Exception {
3018+
testCommitRetriesGuts(true);
3019+
}
3020+
3021+
@Test
3022+
void testCommitAsyncRetries() throws Exception {
3023+
testCommitRetriesGuts(false);
3024+
}
3025+
3026+
@SuppressWarnings({ "unchecked", "rawtypes" })
3027+
private void testCommitRetriesGuts(boolean sync) throws Exception {
3028+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
3029+
Consumer<Integer, String> consumer = mock(Consumer.class);
3030+
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
3031+
Map<String, Object> cfProps = new HashMap<>();
3032+
cfProps.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 45000); // wins
3033+
given(cf.getConfigurationProperties()).willReturn(cfProps);
3034+
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = new HashMap<>();
3035+
records.put(new TopicPartition("foo", 0), Arrays.asList(
3036+
new ConsumerRecord<>("foo", 0, 0L, 1, "foo"),
3037+
new ConsumerRecord<>("foo", 0, 1L, 1, "bar")));
3038+
ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records);
3039+
ConsumerRecords<Integer, String> emptyRecords = new ConsumerRecords<>(Collections.emptyMap());
3040+
AtomicBoolean first = new AtomicBoolean(true);
3041+
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
3042+
Thread.sleep(50);
3043+
return first.getAndSet(false) ? consumerRecords : emptyRecords;
3044+
});
3045+
CountDownLatch latch = new CountDownLatch(4);
3046+
if (sync) {
3047+
willAnswer(i -> {
3048+
latch.countDown();
3049+
throw new RetriableCommitFailedException("");
3050+
}).given(consumer).commitSync(anyMap(), eq(Duration.ofSeconds(45)));
3051+
}
3052+
else {
3053+
willAnswer(i -> {
3054+
OffsetCommitCallback callback = i.getArgument(1);
3055+
callback.onComplete(i.getArgument(0), new RetriableCommitFailedException(""));
3056+
latch.countDown();
3057+
return null;
3058+
}).given(consumer).commitAsync(anyMap(), any());
3059+
}
3060+
TopicPartitionOffset[] topicPartition = new TopicPartitionOffset[] {
3061+
new TopicPartitionOffset("foo", 0) };
3062+
ContainerProperties containerProps = new ContainerProperties(topicPartition);
3063+
containerProps.setSyncCommits(sync);
3064+
containerProps.setGroupId("grp");
3065+
containerProps.setClientId("clientId");
3066+
containerProps.setIdleEventInterval(100L);
3067+
containerProps.setMessageListener((MessageListener) r -> {
3068+
});
3069+
containerProps.setMissingTopicsFatal(false);
3070+
KafkaMessageListenerContainer<Integer, String> container =
3071+
new KafkaMessageListenerContainer<>(cf, containerProps);
3072+
container.start();
3073+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
3074+
container.stop();
3075+
if (sync) {
3076+
verify(consumer, times(4)).commitSync(any(), any());
3077+
}
3078+
else {
3079+
verify(consumer, times(4)).commitAsync(any(), any());
3080+
}
3081+
}
3082+
30143083
private Consumer<?, ?> spyOnConsumer(KafkaMessageListenerContainer<Integer, String> container) {
30153084
Consumer<?, ?> consumer =
30163085
KafkaTestUtils.getPropertyValue(container, "listenerConsumer.consumer", Consumer.class);

0 commit comments

Comments
 (0)