Skip to content

Commit 3373b2b

Browse files
committed
Транспорт для Symfony Messenger посредством Битрикс D7
1 parent 437d5a3 commit 3373b2b

File tree

10 files changed

+810
-3
lines changed

10 files changed

+810
-3
lines changed

DependencyInjection/BitrixOrdinaryToolsExtension.php

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
use Symfony\Component\DependencyInjection\Loader\YamlFileLoader;
1010
use Symfony\Component\HttpKernel\DependencyInjection\Extension;
1111
use Symfony\Component\Notifier\NotifierInterface;
12+
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
1213

1314
/**
1415
* Class BitrixOrdinaryToolsExtension
@@ -41,6 +42,12 @@ public function load(array $configs, ContainerBuilder $container) : void
4142
$loader->load('notifier_transports.yaml');
4243
}
4344

45+
// Битриксовый транспорт для Messenger подключается только
46+
// если Messenger в наличии.
47+
if (class_exists(SenderInterface::class)) {
48+
$loader->load('bitrix_transport.yaml');
49+
}
50+
4451
if (!class_exists('Prokl\CustomFrameworkExtensionsBundle\DependencyInjection\CustomFrameworkExtensionsExtension')) {
4552
throw new LogicException(
4653
'Чтобы использовать Твиг нужно установить и активировать core.framework.extension.bundle.
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
services:
2+
_defaults:
3+
autowire: true
4+
autoconfigure: true
5+
public: true
6+
7+
# Транспорт для Messenger через Битрикс D7.
8+
messenger.transport.bitrix.factory:
9+
public: true
10+
autoconfigure: false
11+
class: Prokl\BitrixOrdinaryToolsBundle\Services\Messenger\Bitrix\BitrixTransportFactory
12+
tags: ['messenger.transport_factory']
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
<?php
2+
3+
namespace Prokl\BitrixOrdinaryToolsBundle\Services\Messenger\Bitrix;
4+
5+
use Symfony\Component\Messenger\Stamp\NonSendableStampInterface;
6+
7+
/**
8+
* Class BitrixReceivedStamp
9+
* @package Prokl\BitrixOrdinaryToolsBundle\Services\Messenger\Bitrix
10+
*
11+
* @internal Форк из https://github.com/bsidev/bitrix-queue.
12+
*/
13+
class BitrixReceivedStamp implements NonSendableStampInterface
14+
{
15+
/**
16+
* @var integer $id
17+
*/
18+
private $id;
19+
20+
/**
21+
* BitrixReceivedStamp constructor.
22+
*
23+
* @param integer $id
24+
*/
25+
public function __construct(int $id)
26+
{
27+
$this->id = $id;
28+
}
29+
30+
/**
31+
* @return integer
32+
*/
33+
public function getId(): int
34+
{
35+
return $this->id;
36+
}
37+
}
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
<?php
2+
3+
namespace Prokl\BitrixOrdinaryToolsBundle\Services\Messenger\Bitrix;
4+
5+
use Exception;
6+
use Symfony\Component\Messenger\Envelope;
7+
use Symfony\Component\Messenger\Exception\LogicException;
8+
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
9+
use Symfony\Component\Messenger\Exception\TransportException;
10+
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
11+
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
12+
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
13+
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
14+
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
15+
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
16+
17+
/**
18+
* Class BitrixReceiver
19+
* @package Prokl\BitrixOrdinaryToolsBundle\Services\Messenger\Bitrix
20+
*
21+
* @internal Форк из https://github.com/bsidev/bitrix-queue.
22+
*/
23+
class BitrixReceiver implements ReceiverInterface, MessageCountAwareInterface, ListableReceiverInterface
24+
{
25+
private const MAX_RETRIES = 3;
26+
27+
/**
28+
* @var integer $retryingSafetyCounter
29+
*/
30+
private $retryingSafetyCounter = 0;
31+
32+
/**
33+
* @var Connection $connection
34+
*/
35+
private $connection;
36+
37+
/**
38+
* @var SerializerInterface $serializer Сериалайзер.
39+
*/
40+
private $serializer;
41+
42+
/**
43+
* BitrixReceiver constructor.
44+
*
45+
* @param Connection $connection
46+
* @param SerializerInterface|null $serializer
47+
*/
48+
public function __construct(Connection $connection, SerializerInterface $serializer = null)
49+
{
50+
$this->connection = $connection;
51+
$this->serializer = $serializer ?? new PhpSerializer();
52+
}
53+
54+
/**
55+
* {@inheritdoc}
56+
*/
57+
public function get(): iterable
58+
{
59+
try {
60+
$bitrixEnvelope = $this->connection->get();
61+
$this->retryingSafetyCounter = 0;
62+
} catch (TransportException $exception) {
63+
if (++$this->retryingSafetyCounter >= self::MAX_RETRIES) {
64+
$this->retryingSafetyCounter = 0;
65+
throw new TransportException($exception->getMessage(), 0, $exception);
66+
}
67+
68+
return [];
69+
}
70+
71+
if ($bitrixEnvelope === null) {
72+
return [];
73+
}
74+
75+
return [$this->createEnvelopeFromData($bitrixEnvelope)];
76+
}
77+
78+
/**
79+
* {@inheritdoc}
80+
*/
81+
public function ack(Envelope $envelope): void
82+
{
83+
$this->connection->ack($this->findBitrixReceivedStamp($envelope)->getId());
84+
}
85+
86+
/**
87+
* {@inheritdoc}
88+
*/
89+
public function reject(Envelope $envelope): void
90+
{
91+
$this->connection->reject($this->findBitrixReceivedStamp($envelope)->getId());
92+
}
93+
94+
/**
95+
* {@inheritdoc}
96+
*/
97+
public function getMessageCount(): int
98+
{
99+
return $this->connection->getMessageCount();
100+
}
101+
102+
/**
103+
* {@inheritdoc}
104+
* @throws Exception
105+
*/
106+
public function all(int $limit = null): iterable
107+
{
108+
$bitrixEnvelopes = $this->connection->findAll($limit);
109+
110+
foreach ($bitrixEnvelopes as $bitrixEnvelope) {
111+
yield $this->createEnvelopeFromData($bitrixEnvelope);
112+
}
113+
}
114+
115+
/**
116+
* {@inheritdoc}
117+
* @throws Exception
118+
*/
119+
public function find($id): ?Envelope
120+
{
121+
$bitrixEnvelope = $this->connection->find($id);
122+
123+
if ($bitrixEnvelope === null) {
124+
return null;
125+
}
126+
127+
return $this->createEnvelopeFromData($bitrixEnvelope);
128+
}
129+
130+
/**
131+
* @param Envelope $envelope
132+
*
133+
* @return BitrixReceivedStamp
134+
* @throws LogicException
135+
*/
136+
private function findBitrixReceivedStamp(Envelope $envelope): BitrixReceivedStamp
137+
{
138+
/** @var BitrixReceivedStamp|null $bitrixReceivedStamp */
139+
$bitrixReceivedStamp = $envelope->last(BitrixReceivedStamp::class);
140+
141+
if ($bitrixReceivedStamp === null) {
142+
throw new LogicException('No BitrixReceivedStamp found on the Envelope.');
143+
}
144+
145+
return $bitrixReceivedStamp;
146+
}
147+
148+
/**
149+
* @param array $data Данные.
150+
*
151+
* @return Envelope
152+
* @throws Exception
153+
*/
154+
private function createEnvelopeFromData(array $data): Envelope
155+
{
156+
try {
157+
$envelope = $this->serializer->decode([
158+
'body' => $data['BODY'],
159+
'headers' => $data['HEADERS'],
160+
]);
161+
} catch (MessageDecodingFailedException $exception) {
162+
$this->connection->reject((int) $data['ID']);
163+
164+
throw $exception;
165+
}
166+
167+
return $envelope->with(
168+
new BitrixReceivedStamp((int) $data['ID']),
169+
new TransportMessageIdStamp((int) $data['ID'])
170+
);
171+
}
172+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
<?php
2+
3+
namespace Prokl\BitrixOrdinaryToolsBundle\Services\Messenger\Bitrix;
4+
5+
use Symfony\Component\Messenger\Envelope;
6+
use Symfony\Component\Messenger\Stamp\DelayStamp;
7+
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
8+
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
9+
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
10+
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
11+
12+
/**
13+
* Class BitrixSender
14+
* @package Local\Services\Messanger
15+
*
16+
* @internal Форк из https://github.com/bsidev/bitrix-queue.
17+
*/
18+
class BitrixSender implements SenderInterface
19+
{
20+
/**
21+
* @var Connection $connection
22+
*/
23+
private $connection;
24+
25+
/**
26+
* @var SerializerInterface $serializer
27+
*/
28+
private $serializer;
29+
30+
/**
31+
* BitrixSender constructor.
32+
*
33+
* @param Connection $connection
34+
* @param SerializerInterface|null $serializer
35+
*/
36+
public function __construct(Connection $connection, SerializerInterface $serializer = null)
37+
{
38+
$this->connection = $connection;
39+
$this->serializer = $serializer ?? new PhpSerializer();
40+
}
41+
42+
/**
43+
* {@inheritdoc}
44+
*/
45+
public function send(Envelope $envelope): Envelope
46+
{
47+
$encodedMessage = $this->serializer->encode($envelope);
48+
49+
/** @var DelayStamp|null $delayStamp */
50+
$delayStamp = $envelope->last(DelayStamp::class);
51+
$delay = null !== $delayStamp ? $delayStamp->getDelay() : 0;
52+
53+
$id = $this->connection->send($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay);
54+
55+
return $envelope->with(new TransportMessageIdStamp($id));
56+
}
57+
}

0 commit comments

Comments
 (0)