Skip to content

Commit f4863f4

Browse files
authored
Merge pull request #1 from ensi-platform/task-83277
#83277
2 parents 56cc390 + 9dcd30e commit f4863f4

File tree

4 files changed

+80
-11
lines changed

4 files changed

+80
-11
lines changed

config/initial-event-propagation.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,12 @@
55
'set_initial_event_http_middleware' => [
66
'default_user_type' => '',
77

8+
/**
9+
* If is set to `true` the middleware does not override the InitialEvent if it was already set for current context earlier.
10+
* Defaults to `false`.
11+
*/
12+
'preserve_existing_event' => false,
13+
814
/**
915
* Middleware parses this header to get `appCode`.
1016
* If the header is not specified here or in a request, `appCode` is taken from `app_code` config value

src/RdKafkaConsumerMiddleware.php

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
<?php
2+
3+
namespace Ensi\LaravelInitialEventPropagation;
4+
5+
use Closure;
6+
use Ensi\InitialEventPropagation\Config;
7+
use Ensi\InitialEventPropagation\InitialEventDTO;
8+
use Ensi\InitialEventPropagation\InitialEventHolder;
9+
use RdKafka\Message;
10+
11+
class RdKafkaConsumerMiddleware
12+
{
13+
public function __construct(private InitialEventHolder $initialEventHolder)
14+
{
15+
}
16+
17+
public function handle(Message $message, Closure $next): mixed
18+
{
19+
$initialEvent = $this->extractInitialEventFromHeaders($message->headers);
20+
if ($initialEvent) {
21+
$this->initialEventHolder->setInitialEvent($initialEvent);
22+
}
23+
24+
return $next($message);
25+
}
26+
27+
protected function extractInitialEventFromHeaders(?array $headers): ?InitialEventDTO
28+
{
29+
$header = $headers[Config::REQUEST_HEADER] ?? '';
30+
31+
return $header ? InitialEventDTO::fromSerializedString($header) : null;
32+
}
33+
}

src/RdKafkaProducerMiddleware.php

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
<?php
2+
3+
namespace Ensi\LaravelInitialEventPropagation;
4+
5+
use Closure;
6+
use Ensi\InitialEventPropagation\Config;
7+
use Ensi\InitialEventPropagation\InitialEventHolder;
8+
use Ensi\LaravelPhpRdKafkaProducer\ProducerMessage;
9+
10+
class RdKafkaProducerMiddleware
11+
{
12+
public function __construct(private InitialEventHolder $initialEventHolder)
13+
{
14+
}
15+
16+
public function handle(ProducerMessage $message, Closure $next): mixed
17+
{
18+
$initialEvent = $this->initialEventHolder->getInitialEvent();
19+
if ($initialEvent) {
20+
$message->headers = $message->headers ?: [];
21+
$message->headers[Config::REQUEST_HEADER] = $initialEvent->serialize();
22+
}
23+
24+
return $next($message);
25+
}
26+
}

src/SetInitialEventHttpMiddleware.php

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,21 @@ public function handle(Request $request, Closure $next): mixed
1515
$user = $request->user();
1616
$config = config('initial-event-propagation', []);
1717
$mc = config('initial-event-propagation.set_initial_event_http_middleware', []);
18-
19-
$initialEvent = InitialEventDTO::fromScratch(
20-
userId: $user ? $user->getId() : "",
21-
userType: $user ? $mc['default_user_type'] : "",
22-
app: !empty($mc['app_code_header']) ? $request->header($mc['app_code_header']) : ($config['app_code'] ?? ''),
23-
entrypoint: $this->extractEntrypoint($request),
24-
correlationId: !empty($mc['correlation_id_header']) ? $request->header($mc['correlation_id_header']) : '',
25-
timestamp: !empty($mc['timestamp_header']) ? $request->header($mc['timestamp_header']) : ''
26-
);
27-
28-
Container::getInstance()->make(InitialEventHolder::class)->setInitialEvent($initialEvent);
18+
$holder = Container::getInstance()->make(InitialEventHolder::class);
19+
$existingEvent = $holder->getInitialEvent();
20+
21+
if (!$existingEvent || empty($mc['preserve_existing_event'])) {
22+
$initialEvent = InitialEventDTO::fromScratch(
23+
userId: $user ? $user->getId() : "",
24+
userType: $user ? $mc['default_user_type'] : "",
25+
app: !empty($mc['app_code_header']) ? $request->header($mc['app_code_header']) : ($config['app_code'] ?? ''),
26+
entrypoint: $this->extractEntrypoint($request),
27+
correlationId: !empty($mc['correlation_id_header']) ? $request->header($mc['correlation_id_header']) : '',
28+
timestamp: !empty($mc['timestamp_header']) ? $request->header($mc['timestamp_header']) : ''
29+
);
30+
31+
$holder->setInitialEvent($initialEvent);
32+
}
2933

3034
return $next($request);
3135
}

0 commit comments

Comments
 (0)