Skip to content

Commit 08a5c3d

Browse files
committed
GH-1370 - Support for serialized event externalization.
1 parent 3672073 commit 08a5c3d

File tree

8 files changed

+258
-21
lines changed

8 files changed

+258
-21
lines changed

spring-modulith-events/spring-modulith-events-api/src/main/java/org/springframework/modulith/events/DefaultEventExternalizationConfiguration.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ class DefaultEventExternalizationConfiguration implements EventExternalizationCo
3333
private final Function<Object, Object> mapper;
3434
private final Function<Object, RoutingTarget> router;
3535
private final Function<Object, Map<String, Object>> headers;
36+
private final boolean serializeExternalization;
3637

3738
/**
3839
* Creates a new {@link DefaultEventExternalizationConfiguration}
@@ -43,7 +44,8 @@ class DefaultEventExternalizationConfiguration implements EventExternalizationCo
4344
* @param headers must not be {@literal null}.
4445
*/
4546
DefaultEventExternalizationConfiguration(Predicate<Object> filter, Function<Object, Object> mapper,
46-
Function<Object, RoutingTarget> router, Function<Object, Map<String, Object>> headers) {
47+
Function<Object, RoutingTarget> router, Function<Object, Map<String, Object>> headers,
48+
boolean serializeExternalization) {
4749

4850
Assert.notNull(filter, "Filter must not be null!");
4951
Assert.notNull(mapper, "Mapper must not be null!");
@@ -53,6 +55,7 @@ class DefaultEventExternalizationConfiguration implements EventExternalizationCo
5355
this.mapper = mapper;
5456
this.router = router;
5557
this.headers = headers;
58+
this.serializeExternalization = serializeExternalization;
5659
}
5760

5861
/**
@@ -111,4 +114,13 @@ public Map<String, Object> getHeadersFor(Object event) {
111114

112115
return headers.apply(event);
113116
}
117+
118+
/*
119+
* (non-Javadoc)
120+
* @see org.springframework.modulith.events.EventExternalizationConfiguration#serializeExternalization()
121+
*/
122+
@Override
123+
public boolean serializeExternalization() {
124+
return serializeExternalization;
125+
}
114126
}

spring-modulith-events/spring-modulith-events-api/src/main/java/org/springframework/modulith/events/EventExternalizationConfiguration.java

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,15 @@ public static Function<Object, RoutingTarget> byFullyQualifiedTypeName() {
199199
*/
200200
Map<String, Object> getHeadersFor(Object event);
201201

202+
/**
203+
* Returns whether the event externalization should be serialized to make sure the broker sees the events in the order
204+
* they were published in the application. By default, this is not guaranteed as multiple threads might trigger events
205+
* and the externalization of one event might overtake the one of another.
206+
*
207+
* @since 2.0
208+
*/
209+
boolean serializeExternalization();
210+
202211
/**
203212
* API to define which events are supposed to be selected for externalization.
204213
*
@@ -372,13 +381,26 @@ private static <T extends Annotation> T findAnnotation(Object event, Class<T> an
372381
}
373382
}
374383

384+
public static abstract class BaseConfiguration<T extends BaseConfiguration<T>> {
385+
386+
protected boolean serializeExternalization;
387+
388+
@SuppressWarnings("unchecked")
389+
public T serializeExternalization(boolean serializeExternalization) {
390+
391+
this.serializeExternalization = serializeExternalization;
392+
393+
return (T) this;
394+
}
395+
}
396+
375397
/**
376398
* API to define the event routing.
377399
*
378400
* @author Oliver Drotbohm
379401
* @since 1.1
380402
*/
381-
public static class Router {
403+
public static class Router extends BaseConfiguration<Router> {
382404

383405
private static final Function<Object, RoutingTarget> DEFAULT_ROUTER = it -> {
384406
return mergeWithExternalizedAnnotation(it, byFullyQualifiedTypeName().apply(it));
@@ -398,7 +420,7 @@ public static class Router {
398420
* @param headers must not be {@literal null}.
399421
*/
400422
Router(Predicate<Object> filter, Function<Object, Object> mapper, Function<Object, RoutingTarget> router,
401-
Function<Object, Map<String, Object>> headers) {
423+
Function<Object, Map<String, Object>> headers, boolean serializeExternalization) {
402424

403425
Assert.notNull(filter, "Selector must not be null!");
404426
Assert.notNull(mapper, "Mapper must not be null!");
@@ -409,6 +431,7 @@ public static class Router {
409431
this.mapper = mapper;
410432
this.router = router;
411433
this.headers = headers;
434+
this.serializeExternalization = serializeExternalization;
412435
}
413436

414437
/**
@@ -417,7 +440,7 @@ public static class Router {
417440
* @param filter must not be {@literal null}.
418441
*/
419442
Router(Predicate<Object> filter) {
420-
this(filter, Function.identity(), DEFAULT_ROUTER, it -> Collections.emptyMap());
443+
this(filter, Function.identity(), DEFAULT_ROUTER, it -> Collections.emptyMap(), false);
421444
}
422445

423446
/**
@@ -431,7 +454,7 @@ public Router mapping(Function<Object, Object> mapper) {
431454

432455
Assert.notNull(mapper, "Mapper must not be null!");
433456

434-
return new Router(filter, mapper, router, headers);
457+
return new Router(filter, mapper, router, headers, serializeExternalization);
435458
}
436459

437460
/**
@@ -453,7 +476,7 @@ public <T> Router mapping(Class<T> type, Function<T, Object> mapper) {
453476
.map(mapper::apply)
454477
.orElse(it);
455478

456-
return new Router(filter, this.mapper.compose(combined), router, headers);
479+
return new Router(filter, this.mapper.compose(combined), router, headers, serializeExternalization);
457480
}
458481

459482
/**
@@ -469,7 +492,7 @@ public Router headers(Function<Object, Map<String, Object>> extractor) {
469492

470493
Assert.notNull(extractor, "Headers extractor must not be null!");
471494

472-
return new Router(filter, mapper, router, extractor);
495+
return new Router(filter, mapper, router, extractor, serializeExternalization);
473496
}
474497

475498
/**
@@ -488,7 +511,7 @@ public <T> Router headers(Class<T> type, Function<T, Map<String, Object>> extrac
488511
.map(extractor::apply)
489512
.orElseGet(() -> this.headers.apply(it));
490513

491-
return new Router(filter, mapper, router, combined);
514+
return new Router(filter, mapper, router, combined, serializeExternalization);
492515
}
493516

494517
/**
@@ -497,7 +520,7 @@ public <T> Router headers(Class<T> type, Function<T, Map<String, Object>> extrac
497520
* @return will never be {@literal null}.
498521
*/
499522
public Router routeMapped() {
500-
return new Router(filter, mapper, router.compose(mapper), headers);
523+
return new Router(filter, mapper, router.compose(mapper), headers, serializeExternalization);
501524
}
502525

503526
/**
@@ -510,7 +533,7 @@ public Router routeAll(Function<Object, RoutingTarget> router) {
510533

511534
Assert.notNull(router, "Router must not be null!");
512535

513-
return new Router(filter, mapper, router, headers);
536+
return new Router(filter, mapper, router, headers, serializeExternalization);
514537
}
515538

516539
/**
@@ -530,7 +553,7 @@ public <T> Router route(Class<T> type, Function<T, RoutingTarget> router) {
530553
.map(router::apply)
531554
.orElseGet(() -> this.router.apply(it));
532555

533-
return new Router(filter, mapper, adapted, headers);
556+
return new Router(filter, mapper, adapted, headers, serializeExternalization);
534557
}
535558

536559
/**
@@ -553,7 +576,7 @@ public <T> Router routeKey(Class<T> type, Function<T, String> extractor) {
553576
.map(t -> this.router.apply(t).withKey(extractor.apply(t)))
554577
.orElseGet(() -> this.router.apply(it));
555578

556-
return new Router(filter, mapper, adapted, headers);
579+
return new Router(filter, mapper, adapted, headers, serializeExternalization);
557580
}
558581

559582
/**
@@ -569,7 +592,7 @@ public EventExternalizationConfiguration routeOptional(Function<Object, Optional
569592

570593
Function<Object, RoutingTarget> adapted = it -> router.apply(it).orElseGet(() -> this.router.apply(it));
571594

572-
return new Router(filter, mapper, adapted, headers).build();
595+
return new Router(filter, mapper, adapted, headers, serializeExternalization).build();
573596
}
574597

575598
/**
@@ -597,7 +620,7 @@ public Router routeAllByType(Function<Class<?>, RoutingTarget> router) {
597620

598621
Assert.notNull(router, "Router must not be null!");
599622

600-
return new Router(filter, mapper, it -> router.apply(it.getClass()), headers);
623+
return new Router(filter, mapper, it -> router.apply(it.getClass()), headers, serializeExternalization);
601624
}
602625

603626
/**
@@ -606,7 +629,7 @@ public Router routeAllByType(Function<Class<?>, RoutingTarget> router) {
606629
* @return will never be {@literal null}.
607630
*/
608631
public EventExternalizationConfiguration build() {
609-
return new DefaultEventExternalizationConfiguration(filter, mapper, router, headers);
632+
return new DefaultEventExternalizationConfiguration(filter, mapper, router, headers, serializeExternalization);
610633
}
611634

612635
private static <T> Optional<T> toOptional(Class<T> type, Object source) {

spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/config/EventExternalizationAutoConfiguration.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.springframework.context.annotation.Role;
3232
import org.springframework.context.event.EventListenerFactory;
3333
import org.springframework.core.Ordered;
34+
import org.springframework.core.env.Environment;
3435
import org.springframework.modulith.events.EventExternalizationConfiguration;
3536
import org.springframework.modulith.events.core.ConditionalEventListener;
3637
import org.springframework.transaction.event.TransactionalApplicationListenerMethodAdapter;
@@ -61,14 +62,24 @@ static EventListenerFactory filteringEventListenerFactory(EventExternalizationCo
6162
@Bean
6263
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
6364
@ConditionalOnMissingBean
64-
static EventExternalizationConfiguration eventExternalizationConfiguration(BeanFactory factory) {
65+
static EventExternalizationConfiguration eventExternalizationConfiguration(BeanFactory factory,
66+
Environment environment) {
6567

6668
var packages = AutoConfigurationPackages.get(factory);
6769

68-
LOG.debug("Configuring event externalization to export events annotated with @Externalized in packages: {}",
69-
packages);
70+
var configuration = EventExternalizationConfiguration.defaults(packages);
7071

71-
return EventExternalizationConfiguration.defaults(packages).build();
72+
var serialize = environment.getProperty("spring.modulith.events.externalization.serialize-externalization",
73+
boolean.class);
74+
75+
if (serialize != null) {
76+
configuration = configuration.serializeExternalization(serialize);
77+
}
78+
79+
LOG.debug("Configuring {}event externalization to export events annotated with @Externalized in packages: {}",
80+
Boolean.TRUE.equals(serialize) ? "serialized " : "", packages);
81+
82+
return configuration.build();
7283
}
7384

7485
/**

spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/EventExternalizationSupport.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package org.springframework.modulith.events.support;
1717

1818
import java.util.concurrent.CompletableFuture;
19+
import java.util.concurrent.Semaphore;
1920

2021
import org.slf4j.Logger;
2122
import org.slf4j.LoggerFactory;
@@ -39,6 +40,7 @@ abstract class EventExternalizationSupport implements ConditionalEventListener {
3940
private static final Logger logger = LoggerFactory.getLogger(EventExternalizationSupport.class);
4041

4142
private final EventExternalizationConfiguration configuration;
43+
private final Semaphore semaphore = new Semaphore(1);
4244

4345
/**
4446
* Creates a new {@link EventExternalizationSupport} for the given {@link EventExternalizationConfiguration}.
@@ -85,8 +87,9 @@ public CompletableFuture<?> externalize(Object event) {
8587
logger.debug("Externalizing event of type {} to {}.", event.getClass(), target);
8688
}
8789

88-
return externalize(mapped, target)
89-
.thenApply(it -> new EventExternalized<>(event, mapped, target, it));
90+
return configuration.serializeExternalization()
91+
? doExternalizeSerialized(event, mapped, target)
92+
: doExternalize(event, mapped, target);
9093
}
9194

9295
/**
@@ -97,4 +100,26 @@ public CompletableFuture<?> externalize(Object event) {
97100
* @return the externalization result, will never be {@literal null}.
98101
*/
99102
protected abstract CompletableFuture<?> externalize(Object payload, RoutingTarget target);
103+
104+
private CompletableFuture<?> doExternalizeSerialized(Object event, Object mapped, RoutingTarget target) {
105+
106+
try {
107+
108+
semaphore.acquire();
109+
110+
return doExternalize(event, mapped, target)
111+
.whenComplete((__, ___) -> semaphore.release());
112+
113+
} catch (InterruptedException o_O) {
114+
115+
semaphore.release();
116+
throw new RuntimeException(o_O);
117+
}
118+
}
119+
120+
private CompletableFuture<?> doExternalize(Object event, Object mapped, RoutingTarget target) {
121+
122+
return externalize(mapped, target)
123+
.thenApply(it -> new EventExternalized<>(event, mapped, target, it));
124+
}
100125
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright 2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.modulith.events.config;
17+
18+
import static org.assertj.core.api.Assertions.*;
19+
20+
import org.junit.jupiter.api.Test;
21+
import org.springframework.boot.autoconfigure.AutoConfigurationPackage;
22+
import org.springframework.boot.autoconfigure.AutoConfigurations;
23+
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
24+
import org.springframework.modulith.events.EventExternalizationConfiguration;
25+
26+
/**
27+
* Integration tests for {@link EventExternalizationAutoConfiguration}.
28+
*
29+
* @author Oliver Drotbohm
30+
* @since 2.0
31+
*/
32+
class EventExternalizationAutoConfigurationIntegrationTests {
33+
34+
@Test // GH-1370
35+
void doesNotExternalizationSerializationByDefault() {
36+
assertSerializationEnabled(null, false);
37+
}
38+
39+
@Test // GH-1370
40+
void configurationPropertyEnablesOrDisablesSerialization() {
41+
42+
assertSerializationEnabled(true, true);
43+
assertSerializationEnabled(false, false);
44+
}
45+
46+
private static void assertSerializationEnabled(Boolean propertyValue, boolean expected) {
47+
48+
var runner = new ApplicationContextRunner()
49+
.withUserConfiguration(SampleApplication.class)
50+
.withConfiguration(AutoConfigurations.of(EventExternalizationAutoConfiguration.class));
51+
52+
if (propertyValue != null) {
53+
54+
runner = runner
55+
.withPropertyValues("spring.modulith.events.externalization.serialize-externalization=" + propertyValue);
56+
}
57+
58+
runner.run(ctx -> {
59+
60+
assertThat(ctx).getBean(EventExternalizationConfiguration.class).satisfies(it -> {
61+
assertThat(it.serializeExternalization()).isEqualTo(expected);
62+
});
63+
});
64+
}
65+
66+
@AutoConfigurationPackage
67+
static class SampleApplication {}
68+
}

0 commit comments

Comments
 (0)