@@ -56,14 +56,6 @@ public function handle(HighLevelConsumer $highLevelConsumer): int
5656 $ this ->consumer = $ highLevelConsumer ;
5757 $ topicKey = $ this ->argument ('topic-key ' );
5858 $ consumer = $ this ->argument ('consumer ' );
59- $ availableConsumers = array_keys (config ('kafka.consumers ' , []));
60-
61- if (!in_array ($ consumer , $ availableConsumers )) {
62- $ this ->error ("Unknown consumer \"$ consumer \"" );
63- $ this ->line ('Available consumers are: " ' . implode (', ' , $ availableConsumers ) . '" and can be found in /config/kafka.php ' );
64-
65- return 1 ;
66- }
6759
6860 $ processorData = $ this ->findMatchedProcessor ($ topicKey , $ consumer );
6961 if (is_null ($ processorData )) {
@@ -94,8 +86,8 @@ public function handle(HighLevelConsumer $highLevelConsumer): int
9486 middleware: $ this ->collectMiddleware ($ consumerPackageOptions ['middleware ' ] ?? []),
9587 );
9688
97- $ topicName = KafkaFacade::topicName ( $ topicKey );
98- $ this ->info ("Start listenning to topic: \"$ topicName \" , consumer \"$ consumer \"" );
89+ $ topicName = KafkaFacade::topicNameByClient ( ' consumer ' , $ consumer , $ topicKey );
90+ $ this ->info ("Start listening to topic: \"{ $ topicKey }\" ( { $ topicName } ) , consumer \"{ $ consumer} \"" );
9991
10092 try {
10193 $ highLevelConsumer
@@ -113,10 +105,9 @@ public function handle(HighLevelConsumer $highLevelConsumer): int
113105 protected function findMatchedProcessor (string $ topic , string $ consumer ): ?ProcessorData
114106 {
115107 foreach (config ('kafka-consumer.processors ' , []) as $ processor ) {
116- if (
117- (empty ($ processor ['topic ' ]) || $ processor ['topic ' ] === $ topic )
118- && (empty ($ processor ['consumer ' ]) || $ processor ['consumer ' ] === $ consumer )
119- ) {
108+ $ topicMatched = empty ($ processor ['topic ' ]) || $ processor ['topic ' ] === $ topic ;
109+ $ consumerMatched = empty ($ processor ['consumer ' ]) || $ processor ['consumer ' ] === $ consumer ;
110+ if ($ topicMatched && $ consumerMatched ) {
120111 return new ProcessorData (
121112 class: $ processor ['class ' ],
122113 topicKey: $ processor ['topic ' ] ?? null ,
0 commit comments