@@ -19,16 +19,24 @@ public function __construct(
1919 /**
2020 * @throws KafkaConsumerProcessorException
2121 */
22- public function build (string $ topicKey , string $ consumerName = 'default ' ): Consumer
22+ public function build (array $ topicKeys , string $ consumerName = 'default ' ): Consumer
2323 {
24- $ processorData = $ this ->makeProcessorData ($ topicKey , $ consumerName );
25- $ consumerOptions = $ this ->makeConsumerOptions ($ consumerName , $ processorData );
24+ $ topicNames = [];
25+ $ processors = [];
26+ foreach ($ topicKeys as $ topicKey ) {
27+ $ topicName = KafkaFacade::topicNameByClient ('consumer ' , $ consumerName , $ topicKey );
28+ $ topicNames [] = $ topicName ;
29+ $ processors [$ topicName ] = $ this ->makeProcessorData ($ topicKey , $ consumerName );
30+ }
31+
32+ $ consumerOptions = $ this ->makeConsumerOptions ($ consumerName );
2633
2734 return new Consumer (
2835 highLevelConsumer: $ this ->highLevelConsumer ,
29- processorData: $ processorData ,
36+ processorData: $ processors ,
3037 consumerOptions: $ consumerOptions ,
31- topicName: KafkaFacade::topicNameByClient ('consumer ' , $ consumerName , $ topicKey )
38+ topicNames: $ topicNames ,
39+ consumerName: $ consumerName ,
3240 );
3341 }
3442
@@ -39,10 +47,6 @@ protected function makeProcessorData(string $topicKey, string $consumerName): Pr
3947 {
4048 $ processorData = $ this ->findMatchedProcessor ($ topicKey , $ consumerName );
4149
42- if (!class_exists ($ processorData ->class )) {
43- throw new KafkaConsumerProcessorException ("Processor class \"$ processorData ->class \" is not found " );
44- }
45-
4650 if (!$ processorData ->hasValidType ()) {
4751 throw new KafkaConsumerProcessorException ("Invalid processor type \"$ processorData ->type \", " .
4852 " supported types are: " . implode (', ' , $ processorData ->getSupportedTypes ()));
@@ -75,12 +79,12 @@ class: $processor['class'],
7579 throw new KafkaConsumerProcessorException ("Processor for topic-key \"$ topicKey \" and consumer \"$ consumerName \" is not found " );
7680 }
7781
78- protected function makeConsumerOptions (string $ consumerName, ProcessorData $ processorData ): ConsumerOptions
82+ protected function makeConsumerOptions (string $ consumerName ): ConsumerOptions
7983 {
8084 $ consumerPackageOptions = config ('kafka-consumer.consumer_options. ' . $ consumerName , []);
8185
8286 return new ConsumerOptions (
83- consumeTimeout: $ consumerPackageOptions ['consume_timeout ' ] ?? $ processorData -> consumeTimeout ,
87+ consumeTimeout: $ consumerPackageOptions ['consume_timeout ' ] ?? 20000 ,
8488 middleware: $ this ->collectMiddleware ($ consumerPackageOptions ['middleware ' ] ?? []),
8589 );
8690 }
0 commit comments