Skip to content

Commit d539d1d

Browse files
author
Ivan Koryukov
committed
#102492 работа с топиком из списка
* ломающее изменение: теперь kafka:consume и kafka-consumer.processors.topic принимают не название топика, а ключ в kafka.topics * добавлен пакет kwn/php-rdkafka-stubs
1 parent 29de855 commit d539d1d

File tree

5 files changed

+16
-11
lines changed

5 files changed

+16
-11
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ psalm.xml
1010
testbench.yaml
1111
vendor
1212
node_modules
13+
studio.json

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
"require": {
1919
"php": "^8.0",
2020
"ext-rdkafka": "*",
21-
"ensi/laravel-phprdkafka": "^0.2",
21+
"ensi/laravel-phprdkafka": "dev-task-102492",
2222
"illuminate/contracts": "^8.37 || ^9.0",
2323
"illuminate/pipeline": "^8.37 || ^9.0",
2424
"illuminate/support": "^8.37 || ^9.0"

src/Commands/KafkaConsumeCommand.php

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace Ensi\LaravelPhpRdKafkaConsumer\Commands;
44

5+
use Ensi\LaravelPhpRdKafka\KafkaFacade;
56
use Ensi\LaravelPhpRdKafkaConsumer\ConsumerOptions;
67
use Ensi\LaravelPhpRdKafkaConsumer\HighLevelConsumer;
78
use Ensi\LaravelPhpRdKafkaConsumer\ProcessorData;
@@ -15,7 +16,7 @@ class KafkaConsumeCommand extends Command implements SignalableCommandInterface
1516
* The name and signature of the console command.
1617
*/
1718
protected $signature = 'kafka:consume
18-
{topic : The name of the topic}
19+
{topic-key : The key of a topic in the kafka.topics list}
1920
{consumer=default : The name of the consumer}
2021
{--max-events=0 : The number of events to consume before stopping}
2122
{--max-time=0 : The maximum number of seconds the worker should run}
@@ -53,7 +54,7 @@ public function handleSignal(int $signal): void
5354
public function handle(HighLevelConsumer $highLevelConsumer): int
5455
{
5556
$this->consumer = $highLevelConsumer;
56-
$topic = $this->argument('topic');
57+
$topicKey = $this->argument('topic-key');
5758
$consumer = $this->argument('consumer');
5859
$availableConsumers = array_keys(config('kafka.consumers', []));
5960

@@ -64,9 +65,9 @@ public function handle(HighLevelConsumer $highLevelConsumer): int
6465
return 1;
6566
}
6667

67-
$processorData = $this->findMatchedProcessor($topic, $consumer);
68+
$processorData = $this->findMatchedProcessor($topicKey, $consumer);
6869
if (is_null($processorData)) {
69-
$this->error("Processor for topic \"$topic\" and consumer \"$consumer\" is not found");
70+
$this->error("Processor for topic-key \"$topicKey\" and consumer \"$consumer\" is not found");
7071
$this->line('Processors are set in /config/kafka-consumers.php');
7172

7273
return 1;
@@ -93,12 +94,13 @@ public function handle(HighLevelConsumer $highLevelConsumer): int
9394
middleware: $this->collectMiddleware($consumerPackageOptions['middleware'] ?? []),
9495
);
9596

96-
$this->info("Start listenning to topic: \"$topic\", consumer \"$consumer\"");
97+
$topicName = KafkaFacade::topicName($topicKey);
98+
$this->info("Start listenning to topic: \"$topicName\", consumer \"$consumer\"");
9799

98100
try {
99101
$highLevelConsumer
100102
->for($consumer)
101-
->listen($topic, $processorData, $consumerOptions);
103+
->listen($topicName, $processorData, $consumerOptions);
102104
} catch (Throwable $e) {
103105
$this->error('An error occurred while listening to the topic: '. $e->getMessage(). ' '. $e->getFile() . '::' . $e->getLine());
104106

@@ -117,7 +119,7 @@ protected function findMatchedProcessor(string $topic, string $consumer): ?Proce
117119
) {
118120
return new ProcessorData(
119121
class: $processor['class'],
120-
topic: $processor['topic'] ?? null,
122+
topicKey: $processor['topic'] ?? null,
121123
consumer: $processor['consumer'] ?? null,
122124
type: $processor['type'] ?? 'action',
123125
queue: $processor['queue'] ?? false,

src/HighLevelConsumer.php

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use Ensi\LaravelPhpRdKafka\KafkaManager;
66
use Ensi\LaravelPhpRdKafkaConsumer\Exceptions\KafkaConsumerException;
7+
use Illuminate\Foundation\Bus\Dispatchable;
78
use Illuminate\Pipeline\Pipeline;
89
use RdKafka\Exception as RdKafkaException;
910
use RdKafka\KafkaConsumer;
@@ -15,7 +16,7 @@ class HighLevelConsumer
1516
{
1617
protected ?KafkaConsumer $consumer;
1718

18-
protected $forceStop = false;
19+
protected bool $forceStop = false;
1920

2021
public function __construct(
2122
protected KafkaManager $kafkaManager,
@@ -40,7 +41,6 @@ public function forceStop(): static
4041
}
4142

4243
/**
43-
* @throws KafkaException
4444
* @throws RdKafkaException
4545
* @throws Throwable
4646
*/
@@ -95,6 +95,7 @@ protected function executeProcessor(ProcessorData $processorData, Message $messa
9595

9696
protected function executeSyncProcessor(ProcessorData $processorData, Message $message): void
9797
{
98+
/** @var class-string|Dispatchable $className */
9899
$className = $processorData->class;
99100
if ($processorData->type === 'job') {
100101
$className::dispatchSync($message);
@@ -105,6 +106,7 @@ protected function executeSyncProcessor(ProcessorData $processorData, Message $m
105106

106107
protected function executeQueueableProcessor(ProcessorData $processorData, Message $message): void
107108
{
109+
/** @var class-string|Dispatchable $className */
108110
$className = $processorData->class;
109111
$queue = $processorData->queue;
110112
if ($processorData->type === 'job') {

src/ProcessorData.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ class ProcessorData
88

99
public function __construct(
1010
public string $class,
11-
public ?string $topic = null,
11+
public ?string $topicKey = null,
1212
public ?string $consumer = null,
1313
public string $type = 'action',
1414
public string|bool $queue = false,

0 commit comments

Comments
 (0)