Skip to content

Commit f68bd16

Browse files
committed
[add] Добавлены недостающие тесты по созданию Consumer
1 parent d0185de commit f68bd16

File tree

6 files changed

+62
-13
lines changed

6 files changed

+62
-13
lines changed

src/Commands/KafkaConsumeCommand.php

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,15 +84,14 @@ public function handle(ConsumerFactory $consumerFactory): int
8484
{
8585
try {
8686
$this->consumer = $consumerFactory
87-
->build($this->getTopicKey(), $this->getConsumer());
87+
->build($this->getTopicKey(), $this->getConsumer())
88+
->setMaxEvents($this->getMaxEvents())
89+
->setMaxTime($this->getMaxTime());
8890

8991
$this->info("Start listening to topic: \"{$this->getTopicKey()}\"" .
9092
" ({$this->consumer->getTopicName()}), consumer \"{$this->getConsumer()}\"");
9193

92-
$this->consumer
93-
->setMaxEvents($this->getMaxEvents())
94-
->setMaxTime($this->getMaxTime())
95-
->listen();
94+
$this->consumer->listen();
9695
} catch (Throwable $exception) {
9796
$this->errorThrowable($exception);
9897

src/Consumers/Consumer.php

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,22 @@ public function forceStop(): void
4242
$this->highLevelConsumer->forceStop();
4343
}
4444

45+
/**
46+
* @return ProcessorData
47+
*/
48+
public function getProcessorData(): ProcessorData
49+
{
50+
return $this->processorData;
51+
}
52+
53+
/**
54+
* @return ConsumerOptions
55+
*/
56+
public function getConsumerOptions(): ConsumerOptions
57+
{
58+
return $this->consumerOptions;
59+
}
60+
4561
/**
4662
* @return void
4763
*

src/Consumers/Factories/ConsumerFactory.php

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -99,11 +99,10 @@ protected function makeConsumerOptions(string $consumer, ProcessorData $processo
9999

100100
protected function collectMiddleware(array $processorMiddleware): array
101101
{
102-
return array_unique(
103-
array_merge(
104-
config('kafka-consumer.global_middleware', []),
105-
$processorMiddleware
106-
)
107-
);
102+
return collect(config('kafka-consumer.global_middleware', []))
103+
->merge($processorMiddleware)
104+
->unique()
105+
->values()
106+
->toArray();
108107
}
109108
}

tests/ConsumerFakerTest.php

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
<?php
22

3+
use Ensi\LaravelPhpRdKafka\KafkaManager;
34
use Ensi\LaravelPhpRdKafkaConsumer\Tests\ConsumerFaker;
45
use Ensi\LaravelPhpRdKafkaConsumer\Tests\TestConsumer;
56
use RdKafka\Message;
@@ -13,3 +14,14 @@
1314

1415
TestConsumer::assertMessageConsumed($message);
1516
});
17+
18+
test('bind testing kafka manager with faker consumer', function () {
19+
TestConsumer::fake('test-model');
20+
21+
ConsumerFaker::new('test-model')
22+
->addMessage(new Message())
23+
->bind();
24+
25+
expect(resolve(KafkaManager::class))
26+
->toBeInstanceOf(\Ensi\LaravelPhpRdKafkaConsumer\Tests\KafkaManager::class);
27+
});

tests/Consumers/ConsumerTest.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@ class: TestConsumer::class,
3333

3434
$consumer->listen();
3535

36-
expect($consumerOptions->maxEvents)
36+
expect($consumer->getConsumerOptions()->maxEvents)
3737
->toBe(10000)
38-
->and($consumerOptions->maxTime)
38+
->and($consumer->getConsumerOptions()->maxTime)
3939
->toBe(5100);
4040

4141
$highLevelConsumer->shouldHaveReceived('for', ['default']);

tests/Consumers/Factories/ConsumerFactoryTest.php

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,26 @@
2323
(new ConsumerFactory(resolve(HighLevelConsumer::class)))
2424
->build('test-models');
2525
})->throws(KafkaConsumerProcessorException::class, 'Invalid processor type "invalid-type", supported types are: action,job');
26+
27+
test('set consume_timeout and middleware to consumer options', function () {
28+
config()
29+
->set('kafka-consumer.consumer_options.default', [
30+
'consume_timeout' => 55000,
31+
'middleware' => ['AlreadyAddedMiddleware', 'KafkaConsumerMiddleware'],
32+
]);
33+
34+
config()
35+
->set('kafka-consumer.global_middleware', ['AlreadyAddedMiddleware']);
36+
37+
setConsumerTopicConfig('test-models', TestConsumer::class);
38+
39+
$consumer = (new ConsumerFactory(resolve(HighLevelConsumer::class)))
40+
->build('test-models');
41+
42+
$consumerOptions = $consumer->getConsumerOptions();
43+
44+
expect($consumerOptions->consumeTimeout)
45+
->toBe(55000)
46+
->and($consumerOptions->middleware)
47+
->toBe(['AlreadyAddedMiddleware', 'KafkaConsumerMiddleware']);
48+
});

0 commit comments

Comments
 (0)