Skip to content

Commit 6a25089

Browse files
authored
Merge pull request #2 from ensi-platform/task-82065
#82065 more confi options of consumer
2 parents 180023e + 2c30c3e commit 6a25089

File tree

4 files changed

+53
-8
lines changed

4 files changed

+53
-8
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ Now go to `config/kafka-consumer.php` and add processors there.
2020

2121
## Usage
2222

23-
The package provides `php artisan kafka:consume {topic} {consumer=default}` command that executes the first processor that matches given topic and consumer name. Consumer name is taken from ensi/laravel-phprdkafka config file.
23+
The package provides `php artisan kafka:consume {topic} {consumer=default} {--max-events=0} {--max-time=0} {--once}` command that executes the first processor that matches given topic and consumer name. Consumer name is taken from `ensi/laravel-phprdkafka config` file.
2424

2525
Processors in config have the following configuration options:
2626

src/Commands/KafkaConsumeCommand.php

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace Ensi\LaravelPhpRdKafkaConsumer\Commands;
44

5+
use Ensi\LaravelPhpRdKafkaConsumer\ConsumerOptions;
56
use Ensi\LaravelPhpRdKafkaConsumer\HighLevelConsumer;
67
use Throwable;
78
use Illuminate\Console\Command;
@@ -11,7 +12,13 @@ class KafkaConsumeCommand extends Command
1112
/**
1213
* The name and signature of the console command.
1314
*/
14-
protected $signature = 'kafka:consume {topic} {consumer=default}';
15+
protected $signature = 'kafka:consume
16+
{topic : The name of the topic}
17+
{consumer=default : The name of the consumer}
18+
{--max-events=0 : The number of events to consume before stopping}
19+
{--max-time=0 : The maximum number of seconds the worker should run}
20+
{--once : Only process the next event in the topic}
21+
';
1522

1623
/**
1724
* The console command description.
@@ -50,8 +57,6 @@ public function handle(): int
5057
return 1;
5158
}
5259

53-
$consumeTimeout = $processorData['consume_timeout'] ?? 20000;
54-
5560
$supportedProcessorTypes = ['action', 'job'];
5661
$processorType = $processorData['type'] ?? 'action';
5762
if (!in_array($processorType, $supportedProcessorTypes)) {
@@ -62,9 +67,15 @@ public function handle(): int
6267

6368
$processorQueue = $processorData['queue'] ?? false;
6469

70+
$consumerOptions = new ConsumerOptions(
71+
consumeTimeout: $processorData['consume_timeout'] ?? 20000,
72+
maxEvents: $this->option('once') ? 1 : (int) $this->option('max-events'),
73+
maxTime: (int) $this->option('max-time')
74+
);
75+
6576
$this->info("Start listenning to topic: \"$topic\", consumer \"$consumer\"");
6677
try {
67-
$kafkaTopicListener = new HighLevelConsumer($topic, $consumer, $consumeTimeout);
78+
$kafkaTopicListener = new HighLevelConsumer($topic, $consumer, $consumerOptions);
6879
$kafkaTopicListener->listen($processorClassName, $processorType, $processorQueue);
6980
} catch (Throwable $e) {
7081
$this->error('An error occurred while listening to the topic: '. $e->getMessage(). ' '. $e->getFile() . '::' . $e->getLine());

src/ConsumerOptions.php

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
<?php
2+
3+
namespace Ensi\LaravelPhpRdKafkaConsumer;
4+
5+
class ConsumerOptions
6+
{
7+
public function __construct(
8+
public int $consumeTimeout = 20000,
9+
public int $maxEvents = 0,
10+
public int $maxTime = 0,
11+
)
12+
{
13+
}
14+
}

src/HighLevelConsumer.php

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ class HighLevelConsumer
1616
public function __construct(
1717
protected string $topicName,
1818
?string $consumerName = null,
19-
protected int $consumeTimeout = 20000,
19+
protected ConsumerOptions $options,
2020
)
2121
{
2222
$manager = resolve(KafkaManager::class);
@@ -32,14 +32,17 @@ public function listen(string $processorClassName, string $processorType, string
3232
{
3333
$this->consumer->subscribe([ $this->topicName ]);
3434

35+
[$startTime, $eventsProcessed] = [hrtime(true) / 1e9, 0];
36+
3537
while (true) {
36-
$message = $this->consumer->consume($this->consumeTimeout);
38+
$message = $this->consumer->consume($this->options->consumeTimeout);
3739

3840
switch ($message->err) {
3941

4042
case RD_KAFKA_RESP_ERR_NO_ERROR:
4143
$this->executeProcessor($processorClassName, $processorType, $processorQueue, $message);
4244
$this->consumer->commitAsync($message);
45+
$eventsProcessed++;
4346
break;
4447

4548
case RD_KAFKA_RESP_ERR__TIMED_OUT:
@@ -51,6 +54,10 @@ public function listen(string $processorClassName, string $processorType, string
5154
default:
5255
throw new KafkaConsumerException('Kafka error: ' . $message->errstr());
5356
}
57+
58+
if ($this->shouldBeStopped($startTime, $eventsProcessed)) {
59+
break;
60+
}
5461
}
5562
}
5663

@@ -79,4 +86,17 @@ protected function executeQueueableProcessor(string $className, string $type, st
7986
is_string($queue) ? $processor->onQueue($queue)->execute($message) : $processor->execute($message);
8087
}
8188
}
82-
}
89+
90+
protected function shouldBeStopped(int|float $startTime, int $eventsProcessed): bool
91+
{
92+
if ($this->options->maxTime && hrtime(true) / 1e9 - $startTime >= $this->options->maxTime) {
93+
return true;
94+
}
95+
96+
if ($this->options->maxEvents && $eventsProcessed >= $this->options->maxEvents) {
97+
return true;
98+
}
99+
100+
return false;
101+
}
102+
}

0 commit comments

Comments
 (0)