80
80
import org .springframework .kafka .core .KafkaTemplate ;
81
81
import org .springframework .kafka .core .ProducerFactory ;
82
82
import org .springframework .kafka .listener .ContainerProperties ;
83
+ import org .springframework .kafka .listener .ContainerProperties .AssignmentCommitOption ;
83
84
import org .springframework .kafka .listener .KafkaMessageListenerContainer ;
84
85
import org .springframework .kafka .requestreply .ReplyingKafkaTemplate ;
85
86
import org .springframework .kafka .support .DefaultKafkaHeaderMapper ;
@@ -503,12 +504,14 @@ void testConsumeAndProduceTransaction() throws Exception {
503
504
ConsumerFactory cf = mock (ConsumerFactory .class );
504
505
willReturn (mockConsumer ).given (cf ).createConsumer ("group" , "" , null , KafkaTestUtils .defaultPropertyOverrides ());
505
506
Producer producer = mock (Producer .class );
507
+ given (producer .send (any (), any ())).willReturn (new SettableListenableFuture <>());
506
508
final CountDownLatch closeLatch = new CountDownLatch (2 );
507
509
willAnswer (i -> {
508
510
closeLatch .countDown ();
509
511
return null ;
510
512
}).given (producer ).close (any ());
511
513
ProducerFactory pf = mock (ProducerFactory .class );
514
+ given (pf .isProducerPerConsumerPartition ()).willReturn (true );
512
515
given (pf .transactionCapable ()).willReturn (true );
513
516
final List <String > transactionalIds = new ArrayList <>();
514
517
willAnswer (i -> {
@@ -519,7 +522,7 @@ void testConsumeAndProduceTransaction() throws Exception {
519
522
ContainerProperties props = new ContainerProperties ("foo" );
520
523
props .setGroupId ("group" );
521
524
props .setTransactionManager (ptm );
522
- props .setMissingTopicsFatal ( false );
525
+ props .setAssignmentCommitOption ( AssignmentCommitOption . ALWAYS );
523
526
final KafkaTemplate template = new KafkaTemplate (pf );
524
527
KafkaMessageListenerContainer container = new KafkaMessageListenerContainer <>(cf , props );
525
528
container .setBeanName ("commit" );
@@ -613,6 +616,7 @@ void testConsumeAndProduceTransactionTxIdOverride() throws Exception {
613
616
ConsumerFactory cf = mock (ConsumerFactory .class );
614
617
willReturn (mockConsumer ).given (cf ).createConsumer ("group" , "" , null , KafkaTestUtils .defaultPropertyOverrides ());
615
618
Producer producer = mock (Producer .class );
619
+ given (producer .send (any (), any ())).willReturn (new SettableListenableFuture <>());
616
620
final CountDownLatch closeLatch = new CountDownLatch (2 );
617
621
willAnswer (i -> {
618
622
closeLatch .countDown ();
@@ -637,7 +641,7 @@ protected Producer createTransactionalProducerForPartition(String txIdPrefix) {
637
641
ContainerProperties props = new ContainerProperties ("foo" );
638
642
props .setGroupId ("group" );
639
643
props .setTransactionManager (tm );
640
- props .setMissingTopicsFatal ( false );
644
+ props .setAssignmentCommitOption ( AssignmentCommitOption . ALWAYS );
641
645
final KafkaTemplate template = new KafkaTemplate (pf );
642
646
template .setTransactionIdPrefix ("template.tx.id." );
643
647
KafkaMessageListenerContainer container = new KafkaMessageListenerContainer <>(cf , props );
0 commit comments