Skip to content

Commit bb3d3df

Browse files
committed
Fix Sonar issues for previous commit
1 parent a657371 commit bb3d3df

File tree

4 files changed

+27
-21
lines changed

4 files changed

+27
-21
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareBatchErrorHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ default void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> d
4141
void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer);
4242

4343
@Override
44-
default void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
44+
default void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
4545
MessageListenerContainer container) {
4646

4747
handle(thrownException, data, consumer);

spring-kafka/src/main/java/org/springframework/kafka/listener/RemainingRecordsErrorHandler.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,9 @@ default void handle(Exception thrownException, @Nullable ConsumerRecord<?, ?> da
5050
void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer);
5151

5252
@Override
53-
default void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
54-
MessageListenerContainer container) {
53+
default void handle(Exception thrownException, @Nullable List<ConsumerRecord<?, ?>> records,
54+
Consumer<?, ?> consumer, MessageListenerContainer container) {
55+
5556
handle(thrownException, records, consumer);
5657
}
5758

spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentBatchErrorHandler.java

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.kafka.clients.consumer.ConsumerRecords;
2424

2525
import org.springframework.kafka.KafkaException;
26+
import org.springframework.lang.Nullable;
2627
import org.springframework.util.backoff.BackOff;
2728
import org.springframework.util.backoff.BackOffExecution;
2829

@@ -55,26 +56,28 @@ public void setBackOff(BackOff backOff) {
5556
}
5657

5758
@Override
58-
public void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
59+
public void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
5960
MessageListenerContainer container) {
6061

61-
data.partitions()
62-
.stream()
63-
.collect(
64-
Collectors.toMap(tp -> tp,
65-
tp -> data.records(tp).get(0).offset(), (u, v) -> (long) v, LinkedHashMap::new))
66-
.forEach(consumer::seek);
62+
if (data != null) {
63+
data.partitions()
64+
.stream()
65+
.collect(
66+
Collectors.toMap(tp -> tp,
67+
tp -> data.records(tp).get(0).offset(), (u, v) -> (long) v, LinkedHashMap::new))
68+
.forEach(consumer::seek);
6769

68-
if (this.backOff != null) {
69-
try {
70-
ListenerUtils.unrecoverableBackOff(this.backOff, this.backOffs, this.lastIntervals, container);
70+
if (this.backOff != null) {
71+
try {
72+
ListenerUtils.unrecoverableBackOff(this.backOff, this.backOffs, this.lastIntervals, container);
73+
}
74+
catch (InterruptedException e) {
75+
Thread.currentThread().interrupt();
76+
}
7177
}
72-
catch (InterruptedException e) {
73-
Thread.currentThread().interrupt();
74-
}
75-
}
7678

77-
throw new KafkaException("Seek to current after exception", getLogLevel(), thrownException);
79+
throw new KafkaException("Seek to current after exception", getLogLevel(), thrownException);
80+
}
7881
}
7982

8083
@Override

spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentErrorHandler.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,11 +106,13 @@ public void setAckAfterHandle(boolean ackAfterHandle) {
106106
}
107107

108108
@Override
109-
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records,
109+
public void handle(Exception thrownException, @Nullable List<ConsumerRecord<?, ?>> records,
110110
Consumer<?, ?> consumer, MessageListenerContainer container) {
111111

112-
SeekUtils.seekOrRecover(thrownException, records, consumer, container, isCommitRecovered(),
113-
getRecoveryStrategy(records, thrownException), this.logger, getLogLevel());
112+
if (records != null) {
113+
SeekUtils.seekOrRecover(thrownException, records, consumer, container, isCommitRecovered(),
114+
getRecoveryStrategy(records, thrownException), this.logger, getLogLevel());
115+
}
114116
}
115117

116118
}

0 commit comments

Comments
 (0)