Skip to content

Commit 4947e3e

Browse files
artembilangaryrussell
authored andcommitted
GH-3454: From MQTT conversion error - to error ch (#3456)
* GH-3454: From MQTT conversion error - to error ch Fixes #3454 The message converter may return null when we try to covert from the MQTT message. The thrown exception may also reset the client connect. * Fix `MqttPahoMessageDrivenChannelAdapter` to catch any conversion errors (including `null` result) and try to send an `ErrorMessage` with that info into the provided `errorChannel`. Otherwise re-throw it as as **Cherry-pick to `5.4.x` & `5.3.x`** * * Apply review language-specific changes
1 parent 41da462 commit 4947e3e

File tree

3 files changed

+99
-23
lines changed

3 files changed

+99
-23
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java

Lines changed: 47 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -40,6 +40,8 @@
4040
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
4141
import org.springframework.messaging.Message;
4242
import org.springframework.messaging.MessagingException;
43+
import org.springframework.messaging.converter.MessageConversionException;
44+
import org.springframework.messaging.support.GenericMessage;
4345
import org.springframework.util.Assert;
4446

4547
/**
@@ -182,7 +184,7 @@ protected void doStart() {
182184
}
183185
catch (Exception e) {
184186
logger.error("Exception while connecting and subscribing, retrying", e);
185-
this.scheduleReconnect();
187+
scheduleReconnect();
186188
}
187189
}
188190

@@ -249,7 +251,7 @@ public void removeTopic(String... topic) {
249251
super.removeTopic(topic);
250252
}
251253
catch (MqttException e) {
252-
throw new MessagingException("Failed to unsubscribe from topic " + Arrays.asList(topic), e);
254+
throw new MessagingException("Failed to unsubscribe from topic(s) " + Arrays.toString(topic), e);
253255
}
254256
finally {
255257
this.topicLock.unlock();
@@ -378,19 +380,52 @@ public synchronized void connectionLost(Throwable cause) {
378380

379381
@Override
380382
public void messageArrived(String topic, MqttMessage mqttMessage) {
381-
AbstractIntegrationMessageBuilder<?> builder = getConverter().toMessageBuilder(topic, mqttMessage);
382-
if (this.manualAcks) {
383-
builder.setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK,
384-
new AcknowledgmentImpl(mqttMessage.getId(), mqttMessage.getQos(), this.client));
383+
AbstractIntegrationMessageBuilder<?> builder = toMessageBuilder(topic, mqttMessage);
384+
if (builder != null) {
385+
if (this.manualAcks) {
386+
builder.setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK,
387+
new AcknowledgmentImpl(mqttMessage.getId(), mqttMessage.getQos(), this.client));
388+
}
389+
Message<?> message = builder.build();
390+
try {
391+
sendMessage(message);
392+
}
393+
catch (RuntimeException ex) {
394+
logger.error("Unhandled exception for " + message.toString(), ex);
395+
throw ex;
396+
}
385397
}
386-
Message<?> message = builder.build();
398+
}
399+
400+
private AbstractIntegrationMessageBuilder<?> toMessageBuilder(String topic, MqttMessage mqttMessage) {
401+
AbstractIntegrationMessageBuilder<?> builder = null;
402+
Exception conversionError = null;
387403
try {
388-
sendMessage(message);
404+
builder = getConverter().toMessageBuilder(topic, mqttMessage);
389405
}
390-
catch (RuntimeException e) {
391-
logger.error("Unhandled exception for " + message.toString(), e);
392-
throw e;
406+
catch (Exception ex) {
407+
conversionError = ex;
408+
}
409+
410+
if (builder == null && conversionError == null) {
411+
conversionError = new IllegalStateException("'MqttMessageConverter' returned 'null'");
412+
}
413+
414+
if (conversionError != null) {
415+
GenericMessage<MqttMessage> message = new GenericMessage<>(mqttMessage);
416+
if (!sendErrorMessageIfNecessary(message, conversionError)) {
417+
MessageConversionException conversionException;
418+
if (conversionError instanceof MessageConversionException) {
419+
conversionException = (MessageConversionException) conversionError;
420+
}
421+
else {
422+
conversionException = new MessageConversionException(message, "Failed to convert from MQTT Message",
423+
conversionError);
424+
}
425+
throw conversionException;
426+
}
393427
}
428+
return builder;
394429
}
395430

396431
@Override

spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java

Lines changed: 48 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -87,9 +87,13 @@
8787
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
8888
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
8989
import org.springframework.integration.mqtt.support.MqttHeaderAccessor;
90+
import org.springframework.integration.mqtt.support.MqttMessageConverter;
91+
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
9092
import org.springframework.integration.test.util.TestUtils;
9193
import org.springframework.messaging.Message;
9294
import org.springframework.messaging.MessageHandlingException;
95+
import org.springframework.messaging.MessageHeaders;
96+
import org.springframework.messaging.support.ErrorMessage;
9397
import org.springframework.messaging.support.GenericMessage;
9498
import org.springframework.scheduling.TaskScheduler;
9599
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
@@ -104,7 +108,7 @@
104108
*/
105109
public class MqttAdapterTests {
106110

107-
private IMqttToken alwaysComplete;
111+
private final IMqttToken alwaysComplete;
108112

109113
{
110114
ProxyFactoryBean pfb = new ProxyFactoryBean();
@@ -196,12 +200,12 @@ public void testOutboundOptionsApplied() throws Exception {
196200
return deliveryToken;
197201
}).given(client).publish(anyString(), any(MqttMessage.class));
198202

199-
handler.handleMessage(new GenericMessage<String>("Hello, world!"));
203+
handler.handleMessage(new GenericMessage<>("Hello, world!"));
200204

201205
verify(client, times(1)).connect(any(MqttConnectOptions.class));
202206
assertThat(connectCalled.get()).isTrue();
203207
AtomicReference<Object> failed = new AtomicReference<>();
204-
handler.setApplicationEventPublisher(event -> failed.set(event));
208+
handler.setApplicationEventPublisher(failed::set);
205209
handler.connectionLost(new IllegalStateException());
206210
assertThat(failed.get()).isInstanceOf(MqttConnectionFailedEvent.class);
207211
handler.stop();
@@ -256,7 +260,7 @@ public void testInboundOptionsApplied() throws Exception {
256260
return null;
257261
}).given(client).connect(any(MqttConnectOptions.class));
258262

259-
final AtomicReference<MqttCallback> callback = new AtomicReference<MqttCallback>();
263+
final AtomicReference<MqttCallback> callback = new AtomicReference<>();
260264
willAnswer(invocation -> {
261265
callback.set(invocation.getArgument(0));
262266
return null;
@@ -269,12 +273,14 @@ public void testInboundOptionsApplied() throws Exception {
269273
adapter.setManualAcks(true);
270274
QueueChannel outputChannel = new QueueChannel();
271275
adapter.setOutputChannel(outputChannel);
276+
QueueChannel errorChannel = new QueueChannel();
277+
adapter.setErrorChannel(errorChannel);
272278
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
273279
taskScheduler.initialize();
274280
adapter.setTaskScheduler(taskScheduler);
275281
adapter.setBeanFactory(mock(BeanFactory.class));
276282
ApplicationEventPublisher applicationEventPublisher = mock(ApplicationEventPublisher.class);
277-
final BlockingQueue<MqttIntegrationEvent> events = new LinkedBlockingQueue<MqttIntegrationEvent>();
283+
final BlockingQueue<MqttIntegrationEvent> events = new LinkedBlockingQueue<>();
278284
willAnswer(invocation -> {
279285
events.add(invocation.getArgument(0));
280286
return null;
@@ -302,6 +308,39 @@ public void testInboundOptionsApplied() throws Exception {
302308
assertThat(event).isInstanceOf(MqttSubscribedEvent.class);
303309
assertThat(((MqttSubscribedEvent) event).getMessage()).isEqualTo("Connected and subscribed to [baz, fix]");
304310

311+
adapter.setConverter(new MqttMessageConverter() {
312+
313+
@Override public Message<?> toMessage(String topic, MqttMessage mqttMessage) {
314+
return null;
315+
}
316+
317+
@Override public AbstractIntegrationMessageBuilder<?> toMessageBuilder(String topic,
318+
MqttMessage mqttMessage) {
319+
320+
return null;
321+
}
322+
323+
@Override public Object fromMessage(Message<?> message, Class<?> targetClass) {
324+
return null;
325+
}
326+
327+
@Override public Message<?> toMessage(Object payload, MessageHeaders headers) {
328+
return null;
329+
}
330+
331+
332+
});
333+
334+
callback.get().messageArrived("baz", message);
335+
336+
ErrorMessage errorMessage = (ErrorMessage) errorChannel.receive(0);
337+
assertThat(errorMessage).isNotNull()
338+
.extracting(Message::getPayload)
339+
.isInstanceOf(IllegalStateException.class);
340+
IllegalStateException exception = (IllegalStateException) errorMessage.getPayload();
341+
assertThat(exception).hasMessage("'MqttMessageConverter' returned 'null'");
342+
assertThat(errorMessage.getOriginalMessage().getPayload()).isSameAs(message);
343+
305344
// lose connection and make first reconnect fail
306345
failConnection.set(true);
307346
RuntimeException e = new RuntimeException("foo");
@@ -424,7 +463,7 @@ public void testReconnect() throws Exception {
424463
// the following assertion should be equalTo, but leq to protect against a slow CI server
425464
assertThat(attemptingReconnectCount.get()).isLessThanOrEqualTo(2);
426465
AtomicReference<Object> failed = new AtomicReference<>();
427-
adapter.setApplicationEventPublisher(event -> failed.set(event));
466+
adapter.setApplicationEventPublisher(failed::set);
428467
adapter.connectionLost(new IllegalStateException());
429468
assertThat(failed.get()).isInstanceOf(MqttConnectionFailedEvent.class);
430469
adapter.stop();
@@ -456,8 +495,7 @@ public void testSubscribeFailure() throws Exception {
456495
new DirectFieldAccessor(client).setPropertyValue("aClient", aClient);
457496
willAnswer(new CallsRealMethods()).given(client).connect(any(MqttConnectOptions.class));
458497
willAnswer(new CallsRealMethods()).given(client).subscribe(any(String[].class), any(int[].class));
459-
willAnswer(new CallsRealMethods()).given(client).subscribe(any(String[].class), any(int[].class),
460-
(IMqttMessageListener[]) isNull());
498+
willAnswer(new CallsRealMethods()).given(client).subscribe(any(String[].class), any(int[].class), isNull());
461499
willReturn(alwaysComplete).given(aClient).connect(any(MqttConnectOptions.class), any(), any());
462500

463501
IMqttToken token = mock(IMqttToken.class);
@@ -562,7 +600,7 @@ public IMqttClient getClientInstance(String uri, String clientId) throws MqttExc
562600
return adapter;
563601
}
564602

565-
private MqttPahoMessageHandler buildAdapterOut(final IMqttAsyncClient client) throws MqttException {
603+
private MqttPahoMessageHandler buildAdapterOut(final IMqttAsyncClient client) {
566604
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory() {
567605

568606
@Override

src/reference/asciidoc/mqtt.adoc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ Starting with version 5.3, you can set the `manualAcks` property to true.
154154
Often used to asynchronously acknowledge delivery.
155155
When set to `true`, header (`IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK`) is added to the message with the value being a `SimpleAcknowledgment`.
156156
You must invoke the `acknowledge()` method to complete the delivery.
157-
See the Javadocs for `IMqppClient` `setManualAcks()` and `messageArrivedComplete()` for more information.
157+
See the Javadocs for `IMqttClient` `setManualAcks()` and `messageArrivedComplete()` for more information.
158158
For convenience a header accessor is provided:
159159

160160
====
@@ -164,6 +164,9 @@ StaticMessageHeaderAccessor.acknowledgment(someMessage).acknowledge();
164164
----
165165
====
166166

167+
Starting with version `5.2.11`, when the message converter throws an exception or returns `null` from the `MqttMessage` conversion, the `MqttPahoMessageDrivenChannelAdapter` sends an `ErrorMessage` into the `errorChannel`, if provided.
168+
Re-throws this conversion error otherwise into an MQTT client callback.
169+
167170
==== Configuring with Java Configuration
168171

169172
The following Spring Boot application shows an example of how to configure the inbound adapter with Java configuration:

0 commit comments

Comments
 (0)