Skip to content

Commit d0185de

Browse files
committed
[upd] Обновление из master ветки и небольшие улучшения и исправления
1 parent 3bba09e commit d0185de

File tree

11 files changed

+138
-91
lines changed

11 files changed

+138
-91
lines changed

README.md

Lines changed: 6 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -156,42 +156,20 @@ You can use any of the constants defined by the pcntl extension https://www.php.
156156

157157
Please see [CONTRIBUTING](.github/CONTRIBUTING.md) for details.
158158

159-
## Consumer testing
159+
## Consumer faking
160160

161161
Testing tools have been added to test the developed handlers. You can create a fake
162162
Consumer and call the topic listening command:
163163

164164
```php
165165
use Ensi\LaravelPhpRdKafkaConsumer\Commands\KafkaConsumeCommand;
166-
use Ensi\LaravelPhpRdKafkaConsumer\Tests\KafkaManagerFaker;
166+
use Ensi\LaravelPhpRdKafkaConsumer\Tests\ConsumerFaker;
167167
use RdKafka\Message;
168168

169-
test('test consume apache kafka', function () {
170-
KafkaManagerFaker::new('test-model')
171-
->addMessage(new Message())
172-
->bind();
173-
174-
artisan(KafkaConsumeCommand::class, ['topic-key' => 'test-model'])
175-
->assertOk();
176-
});
177-
```
178-
179-
or
180-
181-
```php
182-
use Ensi\LaravelPhpRdKafkaConsumer\Consumers\Factories\ConsumerFactory;
183-
use Ensi\LaravelPhpRdKafkaConsumer\Tests\KafkaManagerFaker;
184-
use RdKafka\Message;
185-
186-
test('test consume apache kafka', function () {
187-
KafkaManagerFaker::new('test-model')
188-
->addMessage(new Message())
189-
->bind();
190-
191-
resovle(ConsumerFactory::class)
192-
->build('test-model')
193-
->listen();
194-
});
169+
ConsumerFaker::new('test-model')
170+
->addMessage(new Message())
171+
->addMessage(new Message())
172+
->consume();
195173
```
196174

197175
## Testing

src/Commands/KafkaConsumeCommand.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public function handle(ConsumerFactory $consumerFactory): int
8686
$this->consumer = $consumerFactory
8787
->build($this->getTopicKey(), $this->getConsumer());
8888

89-
$this->info("Start listening to topic: \"{$this->getTopicKey()}\"".
89+
$this->info("Start listening to topic: \"{$this->getTopicKey()}\"" .
9090
" ({$this->consumer->getTopicName()}), consumer \"{$this->getConsumer()}\"");
9191

9292
$this->consumer
@@ -117,8 +117,8 @@ private function errorThrowable(Throwable $exception): void
117117
return;
118118
}
119119

120-
$this->error('An error occurred while listening to the topic: '.
121-
$exception->getMessage(). ' '. $exception->getFile() . '::' . $exception->getLine());
120+
$this->error('An error occurred while listening to the topic: ' .
121+
$exception->getMessage() . ' ' . $exception->getFile() . '::' . $exception->getLine());
122122
}
123123

124124
private function makeLogger(): ConsumerLoggerInterface

src/Consumers/Factories/ConsumerFactory.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ protected function makeProcessorData(string $topicKey, string $consumer): Proces
5252
}
5353

5454
if (!$processorData->hasValidType()) {
55-
throw new KafkaConsumerProcessorException("Invalid processor type \"$processorData->type\",".
55+
throw new KafkaConsumerProcessorException("Invalid processor type \"$processorData->type\"," .
5656
" supported types are: " . implode(',', $processorData->getSupportedTypes()));
5757
}
5858

@@ -89,7 +89,7 @@ class: $processor['class'],
8989

9090
protected function makeConsumerOptions(string $consumer, ProcessorData $processorData): ConsumerOptions
9191
{
92-
$consumerPackageOptions = config('kafka-consumer.consumer_options.'. $consumer, []);
92+
$consumerPackageOptions = config('kafka-consumer.consumer_options.' . $consumer, []);
9393

9494
return new ConsumerOptions(
9595
consumeTimeout: $consumerPackageOptions['consume_timeout'] ?? $processorData->consumeTimeout,

src/Exceptions/KafkaConsumerProcessorException.php

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,4 @@
44

55
final class KafkaConsumerProcessorException extends KafkaConsumerException
66
{
7-
87
}

src/Tests/Consumer/KafkaConsumer.php

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use Ensi\LaravelPhpRdKafkaConsumer\Exceptions\KafkaConsumerMessagedEndedException;
66
use Ensi\LaravelPhpRdKafkaConsumer\Tests\Consumer\Topics\Metadata;
7+
use Ensi\LaravelPhpRdKafkaConsumer\Tests\Consumer\Topics\Topic;
78
use RdKafka\Conf;
89
use RdKafka\KafkaConsumer as BaseKafkaConsumer;
910
use RdKafka\TopicConf;
@@ -39,19 +40,34 @@ public function consume($timeout_ms)
3940
return array_shift($this->messages);
4041
}
4142

42-
public function getMetadata($all_topics, $only_topic, $timeout_ms): Metadata
43+
/**
44+
* @param $all_topics
45+
* @param $only_topic
46+
* @param $timeout_ms
47+
* @return Metadata
48+
*
49+
* @phpstan-ignore-next-line
50+
*/
51+
public function getMetadata($all_topics, $only_topic = null, $timeout_ms): Metadata
4352
{
4453
return $this->metadata;
4554
}
4655

47-
public function getCommittedOffsets($topicPartitions, $timeout_ms): array
56+
public function getCommittedOffsets($topic_partitions, $timeout_ms): array
4857
{
49-
return $topicPartitions;
58+
return $topic_partitions;
5059
}
5160

52-
public function newTopic($topic_name, TopicConf $topic_conf = null)
61+
/**
62+
* @param $topic_name
63+
* @param TopicConf|null $topic_conf
64+
* @return Topic
65+
*
66+
* @phpstan-ignore-next-line
67+
*/
68+
public function newTopic($topic_name, $topic_conf = null): Topic
5369
{
54-
70+
return new Topic($topic_name);
5571
}
5672

5773
public function commitAsync($message_or_offsets = null)

src/Tests/ConsumerFaker.php

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
<?php
2+
3+
namespace Ensi\LaravelPhpRdKafkaConsumer\Tests;
4+
5+
use Ensi\LaravelPhpRdKafka\KafkaFacade;
6+
use Ensi\LaravelPhpRdKafka\KafkaManager as BaseKafkaManager;
7+
use Ensi\LaravelPhpRdKafkaConsumer\Consumers\Factories\ConsumerFactory;
8+
use Ensi\LaravelPhpRdKafkaConsumer\Exceptions\KafkaConsumerProcessorException;
9+
use Ensi\LaravelPhpRdKafkaConsumer\HighLevelConsumer;
10+
use Ensi\LaravelPhpRdKafkaConsumer\Tests\Consumer\KafkaConsumer;
11+
use Ensi\LaravelPhpRdKafkaConsumer\Tests\Exceptions\OnlyTestingEnvironmentException;
12+
use RdKafka\Message;
13+
use Throwable;
14+
15+
class ConsumerFaker
16+
{
17+
protected array $messages = [];
18+
19+
protected string $topicName;
20+
21+
public function __construct(
22+
protected string $topicKey,
23+
protected string $consumer = 'default'
24+
) {
25+
$this->topicName = KafkaFacade::topicNameByClient('consumer', $consumer, $topicKey);
26+
}
27+
28+
public function addMessage(Message $message): self
29+
{
30+
$message->err = RD_KAFKA_RESP_ERR_NO_ERROR;
31+
32+
return $this->addMessageRaw($message);
33+
}
34+
35+
public function addMessageRaw(Message $message): self
36+
{
37+
$this->messages[] = $message;
38+
39+
return $this;
40+
}
41+
42+
/**
43+
* @return void
44+
*
45+
* @throws KafkaConsumerProcessorException
46+
* @throws Throwable
47+
*/
48+
public function consume(): void
49+
{
50+
$this->bind();
51+
52+
(new ConsumerFactory(resolve(HighLevelConsumer::class)))
53+
->build($this->topicKey, $this->consumer)
54+
->listen();
55+
}
56+
57+
public function bind(): void
58+
{
59+
if (!app()->runningUnitTests()) {
60+
throw new OnlyTestingEnvironmentException('Следует использовать только в тестировании');
61+
}
62+
63+
app()->scoped(
64+
BaseKafkaManager::class,
65+
fn () => $this->makeKafkaManager()
66+
);
67+
}
68+
69+
private function makeKafkaManager(): KafkaManager
70+
{
71+
return new KafkaManager(new KafkaConsumer($this->topicName, $this->messages));
72+
}
73+
74+
public static function new(string $topicKey, string $consumer = 'default'): self
75+
{
76+
return new self($topicKey, $consumer);
77+
}
78+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
<?php
2+
3+
namespace Ensi\LaravelPhpRdKafkaConsumer\Tests\Exceptions;
4+
5+
use LogicException;
6+
7+
final class OnlyTestingEnvironmentException extends LogicException
8+
{
9+
}

src/Tests/KafkaManagerFaker.php

Lines changed: 0 additions & 50 deletions
This file was deleted.

tests/Commands/KafkaConsumerCommandTest.php

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
<?php
22

33
use Ensi\LaravelPhpRdKafkaConsumer\Commands\KafkaConsumeCommand;
4-
use Ensi\LaravelPhpRdKafkaConsumer\Tests\KafkaManagerFaker;
4+
use Ensi\LaravelPhpRdKafkaConsumer\Tests\ConsumerFaker;
55
use Ensi\LaravelPhpRdKafkaConsumer\Tests\TestConsumer;
6+
67
use function Pest\Laravel\artisan;
78

89
use RdKafka\Message;
910

1011
test('consume command test', function () {
1112
TestConsumer::fake('test-model');
1213

13-
KafkaManagerFaker::new('test-model')
14+
ConsumerFaker::new('test-model')
1415
->addMessage($message = new Message())
1516
->bind();
1617

tests/ConsumerFakerTest.php

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
<?php
2+
3+
use Ensi\LaravelPhpRdKafkaConsumer\Tests\ConsumerFaker;
4+
use Ensi\LaravelPhpRdKafkaConsumer\Tests\TestConsumer;
5+
use RdKafka\Message;
6+
7+
test('create and consume kafka manager fake', function () {
8+
TestConsumer::fake('test-model');
9+
10+
ConsumerFaker::new('test-model')
11+
->addMessage($message = new Message())
12+
->consume();
13+
14+
TestConsumer::assertMessageConsumed($message);
15+
});

0 commit comments

Comments
 (0)