55use Ensi \LaravelPhpRdKafka \KafkaFacade ;
66use Ensi \LaravelPhpRdKafkaConsumer \ConsumerOptions ;
77use Ensi \LaravelPhpRdKafkaConsumer \HighLevelConsumer ;
8+ use Ensi \LaravelPhpRdKafkaConsumer \Logger \ConsumerLoggerFactory ;
9+ use Ensi \LaravelPhpRdKafkaConsumer \Logger \ConsumerLoggerInterface ;
810use Ensi \LaravelPhpRdKafkaConsumer \ProcessorData ;
911use Illuminate \Console \Command ;
10- use Illuminate \Support \Facades \Log ;
1112use Symfony \Component \Console \Command \SignalableCommandInterface ;
1213use Throwable ;
1314
@@ -54,29 +55,31 @@ public function handleSignal(int $signal, int|false $previousExitCode = 0): int|
5455 /**
5556 * Execute the console command.
5657 */
57- public function handle (HighLevelConsumer $ highLevelConsumer ): int
58+ public function handle (HighLevelConsumer $ highLevelConsumer, ConsumerLoggerFactory $ loggerFactory ): int
5859 {
5960 $ this ->consumer = $ highLevelConsumer ;
6061 $ topicKey = $ this ->argument ('topic-key ' );
6162 $ consumer = $ this ->argument ('consumer ' );
6263
64+ $ logger = $ loggerFactory ->make ($ topicKey , $ consumer );
65+
6366 $ processorData = $ this ->findMatchedProcessor ($ topicKey , $ consumer );
6467 if (is_null ($ processorData )) {
65- $ this ->error ( "Processor for topic-key \"$ topicKey \" and consumer \"$ consumer \" is not found " );
68+ $ this ->errorMessage ( $ logger , "Processor for topic-key \"$ topicKey \" and consumer \"$ consumer \" is not found " );
6669 $ this ->line ('Processors are set in /config/kafka-consumer.php ' );
6770
6871 return 1 ;
6972 }
7073
7174 if (!class_exists ($ processorData ->class )) {
72- $ this ->error ( "Processor class \"$ processorData ->class \" is not found " );
75+ $ this ->errorMessage ( $ logger , "Processor class \"$ processorData ->class \" is not found " );
7376 $ this ->line ('Processors are set in /config/kafka-consumer.php ' );
7477
7578 return 1 ;
7679 }
7780
7881 if (!$ processorData ->hasValidType ()) {
79- $ this ->error ( "Invalid processor type \"$ processorData ->type \", supported types are: " . implode (', ' , $ processorData ->getSupportedTypes ()));
82+ $ this ->errorMessage ( $ logger , "Invalid processor type \"$ processorData ->type \", supported types are: " . implode (', ' , $ processorData ->getSupportedTypes ()));
8083
8184 return 1 ;
8285 }
@@ -97,9 +100,7 @@ public function handle(HighLevelConsumer $highLevelConsumer): int
97100 ->for ($ consumer )
98101 ->listen ($ topicName , $ processorData , $ consumerOptions );
99102 } catch (Throwable $ e ) {
100- Log::error ($ e ->getMessage (), ['exception ' => $ e ]);
101-
102- $ this ->error ('An error occurred while listening to the topic: ' . $ e ->getMessage (). ' ' . $ e ->getFile () . ':: ' . $ e ->getLine ());
103+ $ this ->errorThrowable ($ logger , $ e );
103104
104105 return 1 ;
105106 }
@@ -136,4 +137,16 @@ protected function collectMiddleware(array $processorMiddleware): array
136137 )
137138 );
138139 }
140+
141+ private function errorThrowable (ConsumerLoggerInterface $ logger , Throwable $ e ): void
142+ {
143+ $ logger ->error ($ e ->getMessage (), ['exception ' => $ e ]);
144+ $ this ->error ('An error occurred while listening to the topic: ' . $ e ->getMessage (). ' ' . $ e ->getFile () . ':: ' . $ e ->getLine ());
145+ }
146+
147+ private function errorMessage (ConsumerLoggerInterface $ logger , string $ message ): void
148+ {
149+ $ logger ->error ($ message );
150+ $ this ->error ($ message );
151+ }
139152}
0 commit comments