Skip to content

Commit 9082ca2

Browse files
committed
GH-118 - Reworked infrastructure to complete event publications.
Previously we used a dedicated BeanPostProcessor to decorate beans that expose transactional event listeners with an interceptor to mark event publications as completed if the method invocation completes successfully. We now have simplified the arrangement by using Spring's auto-proxy creation and registering an Advisor implementation. This allows us to properly and reliably order the interceptor between the async one (goes before ours) and the transactional one (goes after ours). Also, this significantly simplifies the implementation of the interceptor as we can assume only commit listener methods being intercepted in the first place.
1 parent 6f5de96 commit 9082ca2

File tree

8 files changed

+340
-275
lines changed

8 files changed

+340
-275
lines changed

spring-modulith-events/spring-modulith-events-core/pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,14 @@
3232
<groupId>org.springframework</groupId>
3333
<artifactId>spring-aop</artifactId>
3434
</dependency>
35+
36+
<!-- Test -->
37+
38+
<dependency>
39+
<groupId>org.springframework</groupId>
40+
<artifactId>spring-test</artifactId>
41+
<scope>test</scope>
42+
</dependency>
3543

3644
<!-- Logging -->
3745
<dependency>

spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/config/EventPublicationConfiguration.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@
1616
package org.springframework.modulith.events.config;
1717

1818
import org.springframework.beans.factory.ObjectFactory;
19+
import org.springframework.beans.factory.config.BeanDefinition;
1920
import org.springframework.context.annotation.Bean;
2021
import org.springframework.context.annotation.Configuration;
22+
import org.springframework.context.annotation.Role;
2123
import org.springframework.modulith.events.DefaultEventPublicationRegistry;
2224
import org.springframework.modulith.events.EventPublicationRegistry;
2325
import org.springframework.modulith.events.EventPublicationRepository;
24-
import org.springframework.modulith.events.support.CompletionRegisteringBeanPostProcessor;
26+
import org.springframework.modulith.events.support.CompletionRegisteringAdvisor;
2527
import org.springframework.modulith.events.support.PersistentApplicationEventMulticaster;
2628

2729
/**
@@ -45,7 +47,8 @@ PersistentApplicationEventMulticaster applicationEventMulticaster(
4547
}
4648

4749
@Bean
48-
static CompletionRegisteringBeanPostProcessor bpp(ObjectFactory<EventPublicationRegistry> store) {
49-
return new CompletionRegisteringBeanPostProcessor(store::getObject);
50+
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
51+
static CompletionRegisteringAdvisor completionRegisteringAdvisor(ObjectFactory<EventPublicationRegistry> registry) {
52+
return new CompletionRegisteringAdvisor(registry::getObject);
5053
}
5154
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.modulith.events.support;
17+
18+
import java.lang.reflect.Method;
19+
import java.util.function.Supplier;
20+
21+
import org.aopalliance.aop.Advice;
22+
import org.aopalliance.intercept.MethodInterceptor;
23+
import org.aopalliance.intercept.MethodInvocation;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
import org.springframework.aop.Advisor;
27+
import org.springframework.aop.MethodMatcher;
28+
import org.springframework.aop.Pointcut;
29+
import org.springframework.aop.support.AbstractPointcutAdvisor;
30+
import org.springframework.aop.support.StaticMethodMatcher;
31+
import org.springframework.aop.support.annotation.AnnotationMatchingPointcut;
32+
import org.springframework.core.Ordered;
33+
import org.springframework.core.annotation.AnnotatedElementUtils;
34+
import org.springframework.lang.NonNull;
35+
import org.springframework.modulith.events.EventPublicationRegistry;
36+
import org.springframework.modulith.events.PublicationTargetIdentifier;
37+
import org.springframework.transaction.event.TransactionPhase;
38+
import org.springframework.transaction.event.TransactionalApplicationListenerMethodAdapter;
39+
import org.springframework.transaction.event.TransactionalEventListener;
40+
import org.springframework.util.Assert;
41+
import org.springframework.util.ConcurrentLruCache;
42+
43+
/**
44+
* An {@link Advisor} to decorate {@link TransactionalEventListener} annotated methods to mark the previously registered
45+
* event publications as completed on successful method execution.
46+
*
47+
* @author Oliver Drotbohm
48+
*/
49+
public class CompletionRegisteringAdvisor extends AbstractPointcutAdvisor {
50+
51+
private static final long serialVersionUID = 5649563426118669238L;
52+
53+
private final Pointcut pointcut;
54+
private final Advice advice;
55+
56+
/**
57+
* Creates a new {@link CompletionRegisteringAdvisor} for the given {@link EventPublicationRegistry}.
58+
*
59+
* @param registry must not be {@literal null}.
60+
*/
61+
public CompletionRegisteringAdvisor(Supplier<EventPublicationRegistry> registry) {
62+
63+
Assert.notNull(registry, "EventPublicationRegistry must not be null!");
64+
65+
this.pointcut = new AnnotationMatchingPointcut(null, TransactionalEventListener.class, true) {
66+
67+
/*
68+
* (non-Javadoc)
69+
* @see org.springframework.aop.support.annotation.AnnotationMatchingPointcut#getMethodMatcher()
70+
*/
71+
@Override
72+
public MethodMatcher getMethodMatcher() {
73+
return new CommitListenerMethodMatcher(super.getMethodMatcher());
74+
}
75+
};
76+
77+
this.advice = new CompletionRegisteringMethodInterceptor(registry);
78+
}
79+
80+
/*
81+
* (non-Javadoc)
82+
* @see org.springframework.aop.PointcutAdvisor#getPointcut()
83+
*/
84+
public Pointcut getPointcut() {
85+
return pointcut;
86+
}
87+
88+
/*
89+
* (non-Javadoc)
90+
* @see org.springframework.aop.Advisor#getAdvice()
91+
*/
92+
@Override
93+
public Advice getAdvice() {
94+
return advice;
95+
}
96+
97+
/**
98+
* An adapter for a delegating {@link MethodMatcher} to verify the
99+
*
100+
* @author Oliver Drotbohm
101+
*/
102+
private static class CommitListenerMethodMatcher extends StaticMethodMatcher {
103+
104+
private final MethodMatcher delegate;
105+
106+
/**
107+
* Creates a new {@link CommitListenerMethodMatcher} with the given delegate {@link MethodMatcher}.
108+
*
109+
* @param delegate must not be {@literal null}.
110+
*/
111+
public CommitListenerMethodMatcher(MethodMatcher delegate) {
112+
this.delegate = delegate;
113+
}
114+
115+
/*
116+
* (non-Javadoc)
117+
* @see org.springframework.aop.MethodMatcher#matches(java.lang.reflect.Method, java.lang.Class)
118+
*/
119+
@Override
120+
public boolean matches(Method method, Class<?> targetClass) {
121+
122+
if (!delegate.matches(method, targetClass)) {
123+
return false;
124+
}
125+
126+
var annotation = AnnotatedElementUtils.findMergedAnnotation(method, TransactionalEventListener.class);
127+
128+
return annotation != null && annotation.phase().equals(TransactionPhase.AFTER_COMMIT);
129+
}
130+
}
131+
132+
/**
133+
* {@link MethodInterceptor} to trigger the completion of an event publication after a transaction event listener
134+
* method has been completed successfully.
135+
*
136+
* @author Oliver Drotbohm
137+
*/
138+
static class CompletionRegisteringMethodInterceptor implements MethodInterceptor, Ordered {
139+
140+
private static final Logger LOG = LoggerFactory.getLogger(CompletionRegisteringMethodInterceptor.class);
141+
142+
private static final ConcurrentLruCache<Method, TransactionalApplicationListenerMethodAdapter> ADAPTERS = new ConcurrentLruCache<>(
143+
100, CompletionRegisteringMethodInterceptor::createAdapter);
144+
145+
private final @NonNull Supplier<EventPublicationRegistry> registry;
146+
147+
/**
148+
* Creates a new {@link CompletionRegisteringMethodInterceptor} for the given {@link EventPublicationRegistry}.
149+
*
150+
* @param registry must not be {@literal null}.
151+
*/
152+
CompletionRegisteringMethodInterceptor(Supplier<EventPublicationRegistry> registry) {
153+
154+
Assert.notNull(registry, "EventPublicationRegistry must not be null!");
155+
156+
this.registry = registry;
157+
}
158+
159+
/*
160+
* (non-Javadoc)
161+
* @see org.aopalliance.intercept.MethodInterceptor#invoke(org.aopalliance.intercept.MethodInvocation)
162+
*/
163+
@Override
164+
public Object invoke(MethodInvocation invocation) throws Throwable {
165+
166+
Object result = null;
167+
var method = invocation.getMethod();
168+
169+
try {
170+
result = invocation.proceed();
171+
} catch (Exception o_O) {
172+
173+
if (LOG.isDebugEnabled()) {
174+
LOG.debug("Invocation of listener {} failed. Leaving event publication uncompleted.", method, o_O);
175+
} else {
176+
LOG.info("Invocation of listener {} failed with message {}. Leaving event publication uncompleted.",
177+
method, o_O.getMessage());
178+
}
179+
180+
return result;
181+
}
182+
183+
// Mark publication complete if the method is a transactional event listener.
184+
String adapterId = ADAPTERS.get(method).getListenerId();
185+
PublicationTargetIdentifier identifier = PublicationTargetIdentifier.of(adapterId);
186+
registry.get().markCompleted(invocation.getArguments()[0], identifier);
187+
188+
return result;
189+
}
190+
191+
/*
192+
* (non-Javadoc)
193+
* @see org.springframework.core.Ordered#getOrder()
194+
*/
195+
@Override
196+
public int getOrder() {
197+
return Ordered.HIGHEST_PRECEDENCE + 10;
198+
}
199+
200+
private static TransactionalApplicationListenerMethodAdapter createAdapter(Method method) {
201+
return new TransactionalApplicationListenerMethodAdapter(null, method.getDeclaringClass(), method);
202+
}
203+
}
204+
}

0 commit comments

Comments
 (0)