22
33namespace Ensi \LaravelPhpRdKafkaConsumer \Commands ;
44
5- use Ensi \LaravelPhpRdKafka \ KafkaFacade ;
6- use Ensi \LaravelPhpRdKafkaConsumer \ConsumerOptions ;
7- use Ensi \LaravelPhpRdKafkaConsumer \HighLevelConsumer ;
8- use Ensi \LaravelPhpRdKafkaConsumer \Logger \ ConsumerLoggerFactory ;
9- use Ensi \LaravelPhpRdKafkaConsumer \Logger \ ConsumerLoggerInterface ;
10- use Ensi \LaravelPhpRdKafkaConsumer \ProcessorData ;
5+ use Ensi \LaravelPhpRdKafkaConsumer \ Consumers \ Consumer ;
6+ use Ensi \LaravelPhpRdKafkaConsumer \Consumers \ Factories \ ConsumerFactory ;
7+ use Ensi \LaravelPhpRdKafkaConsumer \Exceptions \ KafkaConsumerException ;
8+ use Ensi \LaravelPhpRdKafkaConsumer \Exceptions \ KafkaConsumerProcessorException ;
9+ use Ensi \LaravelPhpRdKafkaConsumer \Loggers \ ConsumerLoggerFactory ;
10+ use Ensi \LaravelPhpRdKafkaConsumer \Loggers \ ConsumerLoggerInterface ;
1111use Illuminate \Console \Command ;
1212use Symfony \Component \Console \Command \SignalableCommandInterface ;
1313use Throwable ;
@@ -30,7 +30,12 @@ class KafkaConsumeCommand extends Command implements SignalableCommandInterface
3030 */
3131 protected $ description = 'Consume concrete topic ' ;
3232
33- protected ?HighLevelConsumer $ consumer = null ;
33+ protected ?Consumer $ consumer = null ;
34+
35+ public function __construct (protected ConsumerLoggerFactory $ loggerFactory )
36+ {
37+ parent ::__construct ();
38+ }
3439
3540 public function getStopSignalsFromConfig (): array
3641 {
@@ -42,6 +47,26 @@ public function getSubscribedSignals(): array
4247 return $ this ->getStopSignalsFromConfig ();
4348 }
4449
50+ public function getTopicKey (): string
51+ {
52+ return $ this ->argument ('topic-key ' );
53+ }
54+
55+ public function getConsumer (): string
56+ {
57+ return $ this ->argument ('consumer ' );
58+ }
59+
60+ public function getMaxEvents (): int
61+ {
62+ return $ this ->option ('once ' ) ? 1 : (int ) $ this ->option ('max-events ' );
63+ }
64+
65+ public function getMaxTime (): int
66+ {
67+ return (int ) $ this ->option ('max-time ' );
68+ }
69+
4570 public function handleSignal (int $ signal , int |false $ previousExitCode = 0 ): int |false
4671 {
4772 if ($ this ->consumer ) {
@@ -55,98 +80,50 @@ public function handleSignal(int $signal, int|false $previousExitCode = 0): int|
5580 /**
5681 * Execute the console command.
5782 */
58- public function handle (HighLevelConsumer $ highLevelConsumer , ConsumerLoggerFactory $ loggerFactory ): int
83+ public function handle (ConsumerFactory $ consumerFactory ): int
5984 {
60- $ this ->consumer = $ highLevelConsumer ;
61- $ topicKey = $ this ->argument ('topic-key ' );
62- $ consumer = $ this ->argument ('consumer ' );
63-
64- $ logger = $ loggerFactory ->make ($ topicKey , $ consumer );
65-
66- $ processorData = $ this ->findMatchedProcessor ($ topicKey , $ consumer );
67- if (is_null ($ processorData )) {
68- $ this ->errorMessage ($ logger , "Processor for topic-key \"$ topicKey \" and consumer \"$ consumer \" is not found " );
69- $ this ->line ('Processors are set in /config/kafka-consumer.php ' );
70-
71- return 1 ;
72- }
73-
74- if (!class_exists ($ processorData ->class )) {
75- $ this ->errorMessage ($ logger , "Processor class \"$ processorData ->class \" is not found " );
76- $ this ->line ('Processors are set in /config/kafka-consumer.php ' );
77-
78- return 1 ;
79- }
80-
81- if (!$ processorData ->hasValidType ()) {
82- $ this ->errorMessage ($ logger , "Invalid processor type \"$ processorData ->type \", supported types are: " . implode (', ' , $ processorData ->getSupportedTypes ()));
83-
84- return 1 ;
85- }
86-
87- $ consumerPackageOptions = config ('kafka-consumer.consumer_options. ' . $ consumer , []);
88- $ consumerOptions = new ConsumerOptions (
89- consumeTimeout: $ consumerPackageOptions ['consume_timeout ' ] ?? $ processorData ->consumeTimeout ,
90- maxEvents: $ this ->option ('once ' ) ? 1 : (int ) $ this ->option ('max-events ' ),
91- maxTime: (int ) $ this ->option ('max-time ' ),
92- middleware: $ this ->collectMiddleware ($ consumerPackageOptions ['middleware ' ] ?? []),
93- );
85+ try {
86+ $ this ->consumer = $ consumerFactory
87+ ->build ($ this ->getTopicKey (), $ this ->getConsumer ());
9488
95- $ topicName = KafkaFacade:: topicNameByClient ( ' consumer ' , $ consumer , $ topicKey );
96- $ this -> info ( " Start listening to topic: \"{ $ topicKey }\" ( {$ topicName }), consumer \"{$ consumer }\"" );
89+ $ this -> info ( " Start listening to topic: \"{ $ this -> getTopicKey ()}\"" .
90+ " ( {$ this -> consumer -> getTopicName () }), consumer \"{$ this -> getConsumer () }\"" );
9791
98- try {
99- $ highLevelConsumer
100- ->for ( $ consumer )
101- ->listen ($ topicName , $ processorData , $ consumerOptions );
102- } catch (Throwable $ e ) {
103- $ this ->errorThrowable ($ logger , $ e );
92+ $ this -> consumer
93+ -> setMaxEvents ( $ this -> getMaxEvents ())
94+ ->setMaxTime ( $ this -> getMaxTime () )
95+ ->listen ();
96+ } catch (Throwable $ exception ) {
97+ $ this ->errorThrowable ($ exception );
10498
105- return 1 ;
99+ return self :: FAILURE ;
106100 }
107101
108- return 0 ;
102+ return self :: SUCCESS ;
109103 }
110104
111- protected function findMatchedProcessor ( string $ topic , string $ consumer ): ? ProcessorData
105+ private function errorThrowable ( Throwable $ exception ): void
112106 {
113- foreach (config ('kafka-consumer.processors ' , []) as $ processor ) {
114- $ topicMatched = empty ($ processor ['topic ' ]) || $ processor ['topic ' ] === $ topic ;
115- $ consumerMatched = empty ($ processor ['consumer ' ]) || $ processor ['consumer ' ] === $ consumer ;
116- if ($ topicMatched && $ consumerMatched ) {
117- return new ProcessorData (
118- class: $ processor ['class ' ],
119- topicKey: $ processor ['topic ' ] ?? null ,
120- consumer: $ processor ['consumer ' ] ?? null ,
121- type: $ processor ['type ' ] ?? 'action ' ,
122- queue: $ processor ['queue ' ] ?? false ,
123- consumeTimeout: $ processor ['consume_timeout ' ] ?? 20000 ,
124- );
125- }
126- }
107+ $ this ->makeLogger ()
108+ ->error ($ exception ->getMessage (), ['exception ' => $ exception ]);
127109
128- return null ;
129- }
110+ if ( $ exception instanceof KafkaConsumerException) {
111+ $ this -> error ( $ exception -> getMessage ());
130112
131- protected function collectMiddleware (array $ processorMiddleware ): array
132- {
133- return array_unique (
134- array_merge (
135- config ('kafka-consumer.global_middleware ' , []),
136- $ processorMiddleware
137- )
138- );
139- }
113+ if ($ exception instanceof KafkaConsumerProcessorException) {
114+ $ this ->line ('Processors are set in /config/kafka-consumer.php ' );
115+ }
140116
141- private function errorThrowable (ConsumerLoggerInterface $ logger , Throwable $ e ): void
142- {
143- $ logger ->error ($ e ->getMessage (), ['exception ' => $ e ]);
144- $ this ->error ('An error occurred while listening to the topic: ' . $ e ->getMessage (). ' ' . $ e ->getFile () . ':: ' . $ e ->getLine ());
117+ return ;
118+ }
119+
120+ $ this ->error ('An error occurred while listening to the topic: ' .
121+ $ exception ->getMessage (). ' ' . $ exception ->getFile () . ':: ' . $ exception ->getLine ());
145122 }
146123
147- private function errorMessage ( ConsumerLoggerInterface $ logger , string $ message ): void
124+ private function makeLogger ( ): ConsumerLoggerInterface
148125 {
149- $ logger -> error ( $ message );
150- $ this ->error ( $ message );
126+ return $ this -> loggerFactory
127+ -> make ( $ this ->getTopicKey (), $ this -> getConsumer () );
151128 }
152129}
0 commit comments