Skip to content

Commit 2d43d6b

Browse files
garyrussellartembilan
authored andcommitted
GH-1804: Add ExponentialBackOffWithMaxRetries
Resolves #1804 Simply a convenience to automatically calculate the max elapsed time.
1 parent eef4389 commit 2d43d6b

File tree

4 files changed

+189
-0
lines changed

4 files changed

+189
-0
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5587,6 +5587,29 @@ Starting with version 2.7, the recoverer checks that the partition selected by t
55875587
If the partition is not present, the partition in the `ProducerRecord` is set to `null`, allowing the `KafkaProducer` to select the partition.
55885588
You can disable this check by setting the `verifyPartition` property to `false`.
55895589

5590+
[[exp-backoff]]
5591+
===== `ExponentialBackOffWithMaxRetries` Implementation
5592+
5593+
Spring Framework provides a number of `BackOff` implementations.
5594+
By default, the `ExponentialBackOff` will retry indefinitely; to give up after some number of retry attempts requires calculating the `maxElapsedTime`.
5595+
Since version 2.7.3, Spring for Apache Kafka provides the `ExponentialBackOffWithMaxRetries` which is a subclass that receives the `maxRetries` property and automatically calculates the `maxElapsedTime`, which is a little more convenient.
5596+
5597+
====
5598+
[source, java]
5599+
----
5600+
@Bean
5601+
SeekToCurrentErrorHandler handler() {
5602+
ExponentialBackOffWithMaxRetries bo = new ExponentialBackOffWithMaxRetries(6);
5603+
bo.setInitialInterval(1_000L);
5604+
bo.setMultiplier(2.0);
5605+
bo.setMaxInterval(10_000L);
5606+
return new SeekToCurrentErrorHandler(myRecoverer, bo);
5607+
}
5608+
----
5609+
====
5610+
5611+
This will retry after `1, 2, 4, 8, 10, 10` seconds, before calling the recoverer.
5612+
55905613
[[kerberos]]
55915614
==== JAAS and Kerberos
55925615

spring-kafka-docs/src/main/asciidoc/whats-new.adoc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,3 +99,9 @@ See <<messaging-message-conversion>> for more information.
9999
==== Sequencing `@KafkaListener` s
100100

101101
See <<container-sequencing>> for more information.
102+
103+
[[x27-exp-backoff]]
104+
==== `ExponentialBackOffWithMaxRetries`
105+
106+
A new `BackOff` implementation is provided, making it more convenient to configure the max retries.
107+
See <<exp-backoff>> for more information.
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.support;
18+
19+
import org.springframework.util.backoff.ExponentialBackOff;
20+
21+
/**
22+
* Subclass of {@link ExponentialBackOff} that allows the specification of the maximum
23+
* number of retries rather than the maximum elapsed time.
24+
*
25+
* @author Gary Russell
26+
* @since 2.7.3
27+
*
28+
*/
29+
public class ExponentialBackOffWithMaxRetries extends ExponentialBackOff {
30+
31+
private final int maxRetries;
32+
33+
/**
34+
* Construct an instance that will calculate the {@link #setMaxElapsedTime(long)} from
35+
* the maxRetries.
36+
* @param maxRetries the max retries.
37+
*/
38+
public ExponentialBackOffWithMaxRetries(int maxRetries) {
39+
this.maxRetries = maxRetries;
40+
calculateMaxElapsed();
41+
}
42+
43+
/**
44+
* Get the max retries.
45+
* @return the max retries.
46+
*/
47+
public int getMaxRetries() {
48+
return this.maxRetries;
49+
}
50+
51+
@Override
52+
public void setInitialInterval(long initialInterval) {
53+
super.setInitialInterval(initialInterval);
54+
calculateMaxElapsed();
55+
}
56+
57+
@Override
58+
public void setMultiplier(double multiplier) {
59+
super.setMultiplier(multiplier);
60+
calculateMaxElapsed();
61+
}
62+
63+
@Override
64+
public void setMaxInterval(long maxInterval) {
65+
super.setMaxInterval(maxInterval);
66+
calculateMaxElapsed();
67+
}
68+
69+
@Override
70+
public void setMaxElapsedTime(long maxElapsedTime) {
71+
throw new IllegalStateException("'maxElapsedTime' is calculated from the 'maxRetries' property");
72+
}
73+
74+
private void calculateMaxElapsed() {
75+
long maxInterval = getMaxInterval();
76+
long maxElapsed = Math.min(getInitialInterval(), maxInterval);
77+
long current = maxElapsed;
78+
for (int i = 1; i < this.maxRetries; i++) {
79+
long next = Math.min((long) (current * getMultiplier()), maxInterval);
80+
current = next;
81+
maxElapsed += current;
82+
}
83+
super.setMaxElapsedTime(maxElapsed);
84+
}
85+
86+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.support;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
21+
22+
import java.util.ArrayList;
23+
import java.util.List;
24+
import java.util.stream.IntStream;
25+
26+
import org.junit.jupiter.api.Test;
27+
28+
import org.springframework.util.backoff.BackOffExecution;
29+
30+
/**
31+
* @author Gary Russell
32+
* @since 2.7.3
33+
*
34+
*/
35+
public class ExponentialBackOffWithMaxRetriesTests {
36+
37+
@Test
38+
void calcAll() {
39+
ExponentialBackOffWithMaxRetries bo = new ExponentialBackOffWithMaxRetries(10);
40+
bo.setInitialInterval(1_000L);
41+
bo.setMultiplier(2.0);
42+
bo.setMaxInterval(10_000L);
43+
assertThatIllegalStateException().isThrownBy(() -> bo.setMaxElapsedTime(42L));
44+
assertThat(bo.getMaxRetries()).isEqualTo(10);
45+
List<Long> delays = new ArrayList<>();
46+
BackOffExecution boEx = bo.start();
47+
IntStream.range(0, 11).forEach(i -> delays.add(boEx.nextBackOff()));
48+
assertThat(delays).containsExactly(1_000L, 2_000L, 4_000L, 8_000L, 10_000L, 10_000L, 10_000L, 10_000L, 10_000L,
49+
10_000L, -1L);
50+
}
51+
52+
@Test
53+
void calcMaxLessThanInitial() {
54+
ExponentialBackOffWithMaxRetries bo = new ExponentialBackOffWithMaxRetries(3);
55+
bo.setInitialInterval(10_000L);
56+
bo.setMultiplier(2.0);
57+
bo.setMaxInterval(5_000L);
58+
List<Long> delays = new ArrayList<>();
59+
BackOffExecution boEx = bo.start();
60+
IntStream.range(0, 4).forEach(i -> delays.add(boEx.nextBackOff()));
61+
assertThat(delays).containsExactly(5_000L, 5_000L, 5_000L, -1L);
62+
}
63+
64+
@Test
65+
void calcDefault() {
66+
ExponentialBackOffWithMaxRetries bo = new ExponentialBackOffWithMaxRetries(10);
67+
List<Long> delays = new ArrayList<>();
68+
BackOffExecution boEx = bo.start();
69+
IntStream.range(0, 11).forEach(i -> delays.add(boEx.nextBackOff()));
70+
assertThat(delays).containsExactly(2_000L, 3_000L, 4_500L, 6_750L, 10_125L, 15_187L, 22_780L, 30_000L, 30_000L,
71+
30_000L, -1L);
72+
}
73+
74+
}

0 commit comments

Comments
 (0)