1616
1717package org .springframework .kafka .listener ;
1818
19+ import java .time .Duration ;
20+ import java .util .ArrayList ;
21+ import java .util .HashMap ;
22+ import java .util .List ;
23+ import java .util .Map ;
24+ import java .util .concurrent .CountDownLatch ;
25+ import java .util .concurrent .TimeUnit ;
26+
1927import org .apache .kafka .clients .consumer .Consumer ;
2028import org .apache .kafka .clients .consumer .ConsumerRecord ;
2129import org .apache .kafka .clients .consumer .ConsumerRecords ;
2230import org .apache .kafka .common .TopicPartition ;
2331import org .junit .jupiter .api .Test ;
32+
2433import org .springframework .kafka .core .ConsumerFactory ;
2534import org .springframework .kafka .listener .adapter .FilteringMessageListenerAdapter ;
2635import org .springframework .kafka .listener .adapter .RecordFilterStrategy ;
2736
28- import java .time .Duration ;
29- import java .util .*;
30- import java .util .concurrent .CountDownLatch ;
31- import java .util .concurrent .TimeUnit ;
32-
3337import static org .assertj .core .api .Assertions .assertThat ;
3438import static org .mockito .ArgumentMatchers .any ;
3539import static org .mockito .ArgumentMatchers .anyMap ;
3640import static org .mockito .BDDMockito .given ;
37- import static org .mockito .Mockito .*;
41+ import static org .mockito .Mockito .mock ;
42+ import static org .mockito .Mockito .times ;
43+ import static org .mockito .Mockito .verify ;
3844
3945/**
4046 * Tests to verify the behavior of RECORD acknowledge mode when used with filtering strategies.
4652 */
4753public class AckModeRecordWithFilteringTest {
4854
49- @ SuppressWarnings ("unchecked" )
55+ @ SuppressWarnings ({ "unchecked" , "deprecation" } )
5056 @ Test
5157 public void testCurrentRecordModeCommitsAllRecords () throws InterruptedException {
5258 // Given: A container with RECORD ack mode and a filter that filters out even offsets
@@ -89,7 +95,7 @@ public void testCurrentRecordModeCommitsAllRecords() throws InterruptedException
8995
9096 given (consumer .poll (any (Duration .class )))
9197 .willReturn (consumerRecords )
92- .willReturn (new ConsumerRecords <>( Collections . emptyMap () ));
98+ .willReturn (ConsumerRecords . empty ( ));
9399
94100 // When: Start the container and process records
95101 container .start ();
@@ -103,7 +109,7 @@ public void testCurrentRecordModeCommitsAllRecords() throws InterruptedException
103109 verify (consumer , times (4 )).commitSync (any (), any (Duration .class ));
104110 }
105111
106- @ SuppressWarnings ("unchecked" )
112+ @ SuppressWarnings ({ "unchecked" , "deprecation" } )
107113 @ Test
108114 public void testAllRecordsFilteredStillCommits () throws InterruptedException {
109115 // Given: A container where all records are filtered
@@ -139,7 +145,7 @@ public void testAllRecordsFilteredStillCommits() throws InterruptedException {
139145
140146 given (consumer .poll (any (Duration .class )))
141147 .willReturn (consumerRecords )
142- .willReturn (new ConsumerRecords <>( Collections . emptyMap () ));
148+ .willReturn (ConsumerRecords . empty ( ));
143149
144150 // When: Start the container
145151 container .start ();
@@ -151,7 +157,7 @@ public void testAllRecordsFilteredStillCommits() throws InterruptedException {
151157 verify (consumer , times (2 )).commitSync (any (), any (Duration .class ));
152158 }
153159
154- @ SuppressWarnings ("unchecked" )
160+ @ SuppressWarnings ({ "unchecked" , "deprecation" } )
155161 @ Test
156162 public void testMixedPartitionsWithFiltering () throws InterruptedException {
157163 // Given: Multiple partitions with different records
@@ -201,7 +207,7 @@ record -> record.value().contains("skip");
201207
202208 given (consumer .poll (any (Duration .class )))
203209 .willReturn (consumerRecords )
204- .willReturn (new ConsumerRecords <>( Collections . emptyMap () ));
210+ .willReturn (ConsumerRecords . empty ( ));
205211
206212 // When: Start container
207213 container .start ();
@@ -215,7 +221,7 @@ record -> record.value().contains("skip");
215221 verify (consumer , times (5 )).commitSync (any (), any (Duration .class ));
216222 }
217223
218- @ SuppressWarnings ("unchecked" )
224+ @ SuppressWarnings ({ "unchecked" , "deprecation" } )
219225 @ Test
220226 public void testCommitLogging () throws InterruptedException {
221227 ConsumerFactory <String , String > consumerFactory = mock (ConsumerFactory .class );
@@ -251,7 +257,7 @@ public void testCommitLogging() throws InterruptedException {
251257
252258 given (consumer .poll (any (Duration .class )))
253259 .willReturn (consumerRecords )
254- .willReturn (new ConsumerRecords <>( Collections . emptyMap () ));
260+ .willReturn (ConsumerRecords . empty ( ));
255261
256262 // When
257263 container .start ();
@@ -262,7 +268,7 @@ public void testCommitLogging() throws InterruptedException {
262268 verify (consumer , times (2 )).commitSync (anyMap (), any (Duration .class ));
263269 }
264270
265- @ SuppressWarnings ("unchecked" )
271+ @ SuppressWarnings ({ "unchecked" , "deprecation" } )
266272 @ Test
267273 public void testAckDiscardedParameterBehavior () throws InterruptedException {
268274 ConsumerFactory <String , String > consumerFactory = mock (ConsumerFactory .class );
@@ -303,7 +309,7 @@ public void testAckDiscardedParameterBehavior() throws InterruptedException {
303309
304310 given (consumer .poll (any (Duration .class )))
305311 .willReturn (consumerRecords )
306- .willReturn (new ConsumerRecords <>( Collections . emptyMap () ));
312+ .willReturn (ConsumerRecords . empty ( ));
307313
308314 container .start ();
309315 assertThat (processedLatch .await (5 , TimeUnit .SECONDS )).isTrue ();
0 commit comments