Skip to content

Commit 84e9f38

Browse files
committed
GH-855 - Support to add headers in event externalization.
EventExternalizationConfiguration now exposes a ….headers(Class<T>, Function<T, Map<String, Object>) to allow to define a function that extracts headers from the event that are supposed to added to the message to be sent out. The Kafka and AMQP implementations have been augmented to consider those configurations. Furthermore, if the mapping step prior to the externalization creates a Spring Message<?>, we add routing information as fallback and send it out as is.
1 parent 7478347 commit 84e9f38

File tree

10 files changed

+193
-28
lines changed

10 files changed

+193
-28
lines changed

spring-modulith-events/spring-modulith-events-amqp/src/main/java/org/springframework/modulith/events/amqp/RabbitEventExternalizerConfiguration.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,9 @@ DelegatingEventExternalizer rabbitEventExternalizer(EventExternalizationConfigur
6262
return new DelegatingEventExternalizer(configuration, (target, payload) -> {
6363

6464
var routing = BrokerRouting.of(target, context);
65+
var headers = configuration.getHeadersFor(payload);
6566

66-
operations.convertAndSend(routing.getTarget(), routing.getKey(payload), payload);
67+
operations.convertAndSend(routing.getTarget(), routing.getKey(payload), payload, headers);
6768

6869
return CompletableFuture.completedFuture(null);
6970
});

spring-modulith-events/spring-modulith-events-api/src/main/java/org/springframework/modulith/events/DefaultEventExternalizationConfiguration.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package org.springframework.modulith.events;
1717

18+
import java.util.Map;
1819
import java.util.function.Function;
1920
import java.util.function.Predicate;
2021

@@ -31,16 +32,18 @@ class DefaultEventExternalizationConfiguration implements EventExternalizationCo
3132
private final Predicate<Object> filter;
3233
private final Function<Object, Object> mapper;
3334
private final Function<Object, RoutingTarget> router;
35+
private final Function<Object, Map<String, Object>> headers;
3436

3537
/**
3638
* Creates a new {@link DefaultEventExternalizationConfiguration}
3739
*
3840
* @param filter must not be {@literal null}.
3941
* @param mapper must not be {@literal null}.
4042
* @param router must not be {@literal null}.
43+
* @param headers must not be {@literal null}.
4144
*/
4245
DefaultEventExternalizationConfiguration(Predicate<Object> filter, Function<Object, Object> mapper,
43-
Function<Object, RoutingTarget> router) {
46+
Function<Object, RoutingTarget> router, Function<Object, Map<String, Object>> headers) {
4447

4548
Assert.notNull(filter, "Filter must not be null!");
4649
Assert.notNull(mapper, "Mapper must not be null!");
@@ -49,6 +52,7 @@ class DefaultEventExternalizationConfiguration implements EventExternalizationCo
4952
this.filter = filter;
5053
this.mapper = mapper;
5154
this.router = router;
55+
this.headers = headers;
5256
}
5357

5458
/**
@@ -95,4 +99,16 @@ public RoutingTarget determineTarget(Object event) {
9599

96100
return router.apply(event).verify();
97101
}
102+
103+
/*
104+
* (non-Javadoc)
105+
* @see org.springframework.modulith.events.EventExternalizationConfiguration#getHeadersFor(java.lang.Object)
106+
*/
107+
@Override
108+
public Map<String, Object> getHeadersFor(Object event) {
109+
110+
Assert.notNull(event, "Event must not be null!");
111+
112+
return headers.apply(event);
113+
}
98114
}

spring-modulith-events/spring-modulith-events-api/src/main/java/org/springframework/modulith/events/EventExternalizationConfiguration.java

Lines changed: 71 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919

2020
import java.lang.annotation.Annotation;
2121
import java.util.Collection;
22+
import java.util.Collections;
2223
import java.util.List;
24+
import java.util.Map;
2325
import java.util.Optional;
2426
import java.util.function.BiFunction;
2527
import java.util.function.BiPredicate;
@@ -188,6 +190,15 @@ public static Function<Object, RoutingTarget> byFullyQualifiedTypeName() {
188190
*/
189191
RoutingTarget determineTarget(Object event);
190192

193+
/**
194+
* Returns the headers to be attached to the message sent out for the given event.
195+
*
196+
* @param event must not be {@literal null}.
197+
* @return will never be {@literal null}.
198+
* @since 1.3
199+
*/
200+
Map<String, Object> getHeadersFor(Object event);
201+
191202
/**
192203
* API to define which events are supposed to be selected for externalization.
193204
*
@@ -367,23 +378,28 @@ public static class Router {
367378
private final Predicate<Object> filter;
368379
private final Function<Object, Object> mapper;
369380
private final Function<Object, RoutingTarget> router;
381+
private final Function<Object, Map<String, Object>> headers;
370382

371383
/**
372384
* Creates a new {@link Router} for the given selector {@link Predicate} and mapper and router {@link Function}s.
373385
*
374386
* @param filter must not be {@literal null}.
375387
* @param mapper must not be {@literal null}.
376388
* @param router must not be {@literal null}.
389+
* @param headers must not be {@literal null}.
377390
*/
378-
Router(Predicate<Object> filter, Function<Object, Object> mapper, Function<Object, RoutingTarget> router) {
391+
Router(Predicate<Object> filter, Function<Object, Object> mapper, Function<Object, RoutingTarget> router,
392+
Function<Object, Map<String, Object>> headers) {
379393

380394
Assert.notNull(filter, "Selector must not be null!");
381395
Assert.notNull(mapper, "Mapper must not be null!");
382396
Assert.notNull(router, "Router must not be null!");
397+
Assert.notNull(headers, "Headers extractor must not be null!");
383398

384399
this.filter = filter;
385400
this.mapper = mapper;
386401
this.router = router;
402+
this.headers = headers;
387403
}
388404

389405
/**
@@ -392,7 +408,7 @@ public static class Router {
392408
* @param filter must not be {@literal null}.
393409
*/
394410
Router(Predicate<Object> filter) {
395-
this(filter, Function.identity(), DEFAULT_ROUTER);
411+
this(filter, Function.identity(), DEFAULT_ROUTER, it -> Collections.emptyMap());
396412
}
397413

398414
/**
@@ -406,7 +422,7 @@ public Router mapping(Function<Object, Object> mapper) {
406422

407423
Assert.notNull(mapper, "Mapper must not be null!");
408424

409-
return new Router(filter, mapper, router);
425+
return new Router(filter, mapper, router, headers);
410426
}
411427

412428
/**
@@ -428,7 +444,42 @@ public <T> Router mapping(Class<T> type, Function<T, Object> mapper) {
428444
.map(mapper::apply)
429445
.orElse(it);
430446

431-
return new Router(filter, this.mapper.compose(combined), router);
447+
return new Router(filter, this.mapper.compose(combined), router, headers);
448+
}
449+
450+
/**
451+
* Registers the given function to extract headers from the events to be externalized. Will reset the entire header
452+
* extractor arrangement. For type-specific extractions, see {@link #headers(Class, Function)}.
453+
*
454+
* @param extractor must not be {@literal null}.
455+
* @return will never be {@literal null}.
456+
* @see #headers(Class, Function)
457+
* @since 1.3
458+
*/
459+
public Router headers(Function<Object, Map<String, Object>> extractor) {
460+
461+
Assert.notNull(extractor, "Headers extractor must not be null!");
462+
463+
return new Router(filter, mapper, router, extractor);
464+
}
465+
466+
/**
467+
* Registers the given type-specific function to extract headers from the events to be externalized.
468+
*
469+
* @param extractor must not be {@literal null}.
470+
* @return will never be {@literal null}.
471+
* @since 1.3
472+
*/
473+
public <T> Router headers(Class<T> type, Function<T, Map<String, Object>> extractor) {
474+
475+
Assert.notNull(type, "Type must not be null!");
476+
Assert.notNull(extractor, "Headers extractor must not be null!");
477+
478+
Function<Object, Map<String, Object>> combined = it -> toOptional(type, it)
479+
.map(extractor::apply)
480+
.orElseGet(() -> this.headers.apply(it));
481+
482+
return new Router(filter, mapper, router, combined);
432483
}
433484

434485
/**
@@ -437,7 +488,7 @@ public <T> Router mapping(Class<T> type, Function<T, Object> mapper) {
437488
* @return will never be {@literal null}.
438489
*/
439490
public Router routeMapped() {
440-
return new Router(filter, mapper, router.compose(mapper));
491+
return new Router(filter, mapper, router.compose(mapper), headers);
441492
}
442493

443494
/**
@@ -450,7 +501,7 @@ public Router routeAll(Function<Object, RoutingTarget> router) {
450501

451502
Assert.notNull(router, "Router must not be null!");
452503

453-
return new Router(filter, mapper, router);
504+
return new Router(filter, mapper, router, headers);
454505
}
455506

456507
/**
@@ -466,9 +517,11 @@ public <T> Router route(Class<T> type, Function<T, RoutingTarget> router) {
466517
Assert.notNull(type, "Type must not be null!");
467518
Assert.notNull(router, "Router must not be null!");
468519

469-
return new Router(filter, mapper, it -> toOptional(type, it)
520+
Function<Object, RoutingTarget> adapted = it -> toOptional(type, it)
470521
.map(router::apply)
471-
.orElseGet(() -> this.router.apply(it)));
522+
.orElseGet(() -> this.router.apply(it));
523+
524+
return new Router(filter, mapper, adapted, headers);
472525
}
473526

474527
/**
@@ -487,9 +540,11 @@ public <T> Router routeKey(Class<T> type, Function<T, String> extractor) {
487540
Assert.notNull(type, "Type must not be null!");
488541
Assert.notNull(extractor, "Extractor must not be null!");
489542

490-
return new Router(filter, mapper, it -> toOptional(type, it)
543+
Function<Object, RoutingTarget> adapted = it -> toOptional(type, it)
491544
.map(t -> this.router.apply(t).withKey(extractor.apply(t)))
492-
.orElseGet(() -> this.router.apply(it)));
545+
.orElseGet(() -> this.router.apply(it));
546+
547+
return new Router(filter, mapper, adapted, headers);
493548
}
494549

495550
/**
@@ -503,9 +558,9 @@ public EventExternalizationConfiguration routeOptional(Function<Object, Optional
503558

504559
Assert.notNull(router, "Router must not be null!");
505560

506-
return new Router(filter, mapper, it -> router.apply(it)
507-
.orElseGet(() -> this.router.apply(it)))
508-
.build();
561+
Function<Object, RoutingTarget> adapted = it -> router.apply(it).orElseGet(() -> this.router.apply(it));
562+
563+
return new Router(filter, mapper, adapted, headers).build();
509564
}
510565

511566
/**
@@ -533,16 +588,16 @@ public Router routeAllByType(Function<Class<?>, RoutingTarget> router) {
533588

534589
Assert.notNull(router, "Router must not be null!");
535590

536-
return new Router(filter, mapper, it -> router.apply(it.getClass()));
591+
return new Router(filter, mapper, it -> router.apply(it.getClass()), headers);
537592
}
538593

539594
/**
540-
* Creates a new {@link EventExternalizationConfiguration} refelcting the current configuration.
595+
* Creates a new {@link EventExternalizationConfiguration} reflecting the current configuration.
541596
*
542597
* @return will never be {@literal null}.
543598
*/
544599
public EventExternalizationConfiguration build() {
545-
return new DefaultEventExternalizationConfiguration(filter, mapper, router);
600+
return new DefaultEventExternalizationConfiguration(filter, mapper, router, headers);
546601
}
547602

548603
private static <T> Optional<T> toOptional(Class<T> type, Object source) {

spring-modulith-events/spring-modulith-events-api/src/test/java/org/springframework/modulith/events/EventExternalizationConfigurationUnitTests.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222

2323
import java.lang.annotation.Retention;
2424
import java.lang.annotation.RetentionPolicy;
25+
import java.util.List;
26+
import java.util.Map;
2527

2628
import org.junit.jupiter.api.Test;
2729

@@ -130,6 +132,21 @@ void defaultSetup() {
130132
assertThat(target.getKey()).isNull();
131133
}
132134

135+
@Test // GH-855
136+
void registersHeaderExtractor() {
137+
138+
var configuration = defaults("org.springframework.modulith")
139+
.headers(AnotherSampleEvent.class, it -> Map.of("another", "anotherValue"))
140+
.headers(SampleEvent.class, it -> Map.of("sample", "value"))
141+
.build();
142+
143+
assertThat(configuration.getHeadersFor(new SampleEvent()))
144+
.containsEntry("sample", "value");
145+
146+
assertThat(configuration.getHeadersFor(new AnotherSampleEvent()))
147+
.containsEntry("another", "anotherValue");
148+
}
149+
133150
@Retention(RetentionPolicy.RUNTIME)
134151
@interface CustomExternalized {
135152
String value() default "";

spring-modulith-events/spring-modulith-events-kafka/src/main/java/org/springframework/modulith/events/kafka/KafkaEventExternalizerConfiguration.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727
import org.springframework.expression.spel.support.StandardEvaluationContext;
2828
import org.springframework.kafka.core.KafkaOperations;
2929
import org.springframework.kafka.core.KafkaTemplate;
30+
import org.springframework.kafka.support.KafkaHeaders;
31+
import org.springframework.messaging.Message;
32+
import org.springframework.messaging.support.MessageBuilder;
3033
import org.springframework.modulith.events.EventExternalizationConfiguration;
3134
import org.springframework.modulith.events.config.EventExternalizationAutoConfiguration;
3235
import org.springframework.modulith.events.support.BrokerRouting;
@@ -61,7 +64,17 @@ DelegatingEventExternalizer kafkaEventExternalizer(EventExternalizationConfigura
6164
return new DelegatingEventExternalizer(configuration, (target, payload) -> {
6265

6366
var routing = BrokerRouting.of(target, context);
64-
return operations.send(routing.getTarget(), routing.getKey(payload), payload);
67+
68+
var builder = payload instanceof Message<?> message
69+
? MessageBuilder.fromMessage(message)
70+
: MessageBuilder.withPayload(payload).copyHeaders(configuration.getHeadersFor(payload));
71+
72+
var message = builder
73+
.setHeaderIfAbsent(KafkaHeaders.KEY, routing.getKey(payload))
74+
.setHeaderIfAbsent(KafkaHeaders.TOPIC, routing.getTarget())
75+
.build();
76+
77+
return operations.send(message);
6578
});
6679
}
6780
}

spring-modulith-events/spring-modulith-events-kafka/src/main/java/org/springframework/modulith/events/kafka/KafkaJacksonConfiguration.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
2323
import org.springframework.context.annotation.Bean;
2424
import org.springframework.context.annotation.PropertySource;
25+
import org.springframework.kafka.support.converter.ByteArrayJsonMessageConverter;
2526
import org.springframework.kafka.support.converter.JsonMessageConverter;
2627

2728
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -41,7 +42,7 @@ class KafkaJacksonConfiguration {
4142
@Bean
4243
@ConditionalOnBean(ObjectMapper.class)
4344
@ConditionalOnMissingBean(JsonMessageConverter.class)
44-
JsonMessageConverter jsonMessageConverter(ObjectMapper mapper) {
45-
return new JsonMessageConverter(mapper);
45+
ByteArrayJsonMessageConverter jsonMessageConverter(ObjectMapper mapper) {
46+
return new ByteArrayJsonMessageConverter(mapper);
4647
}
4748
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
1+
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer
22
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer

0 commit comments

Comments
 (0)