From 3639251592bbbf20f3810b023af6f78766887138 Mon Sep 17 00:00:00 2001 From: sbraconnier Date: Sat, 17 May 2025 01:07:57 -0400 Subject: [PATCH 1/2] Fix intermittent ConcurrentModificationException (#92) Signed-off-by: sbraconnier --- .../connect/donkey/model/message/Message.java | 89 ++++++++++--------- 1 file changed, 48 insertions(+), 41 deletions(-) diff --git a/donkey/src/main/java/com/mirth/connect/donkey/model/message/Message.java b/donkey/src/main/java/com/mirth/connect/donkey/model/message/Message.java index ffac777dbb..d2c836d11f 100644 --- a/donkey/src/main/java/com/mirth/connect/donkey/model/message/Message.java +++ b/donkey/src/main/java/com/mirth/connect/donkey/model/message/Message.java @@ -114,55 +114,62 @@ public Map getConnectorMessages() { } public ConnectorMessage getMergedConnectorMessage() { - if (mergedConnectorMessage == null) { - mergedConnectorMessage = new ConnectorMessage(); - mergedConnectorMessage.setChannelId(channelId); - mergedConnectorMessage.setMessageId(messageId); - mergedConnectorMessage.setServerId(serverId); - mergedConnectorMessage.setReceivedDate(receivedDate); - - Map sourceMap = null; - Map responseMap = new HashMap(); - Map channelMap = new HashMap(); - - ConnectorMessage sourceConnectorMessage = connectorMessages.get(0); - - if (sourceConnectorMessage != null) { - mergedConnectorMessage.setRaw(sourceConnectorMessage.getRaw()); - mergedConnectorMessage.setProcessedRaw(sourceConnectorMessage.getProcessedRaw()); - sourceMap = sourceConnectorMessage.getSourceMap(); - responseMap.putAll(sourceConnectorMessage.getResponseMap()); - channelMap.putAll(sourceConnectorMessage.getChannelMap()); - } - List orderedConnectorMessages = new ArrayList(connectorMessages.values()); - Collections.sort(orderedConnectorMessages, new Comparator() { - @Override - public int compare(ConnectorMessage m1, ConnectorMessage m2) { - if (m1.getChainId() == m2.getChainId()) { - return m1.getOrderId() - m2.getOrderId(); - } else { - return m1.getChainId() - m2.getChainId(); - } + if (mergedConnectorMessage != null) { + return mergedConnectorMessage; + } + + synchronized (this) { + if (mergedConnectorMessage == null) { + mergedConnectorMessage = new ConnectorMessage(); + mergedConnectorMessage.setChannelId(channelId); + mergedConnectorMessage.setMessageId(messageId); + mergedConnectorMessage.setServerId(serverId); + mergedConnectorMessage.setReceivedDate(receivedDate); + + Map sourceMap = null; + Map responseMap = new HashMap(); + Map channelMap = new HashMap(); + + ConnectorMessage sourceConnectorMessage = connectorMessages.get(0); + + if (sourceConnectorMessage != null) { + mergedConnectorMessage.setRaw(sourceConnectorMessage.getRaw()); + mergedConnectorMessage.setProcessedRaw(sourceConnectorMessage.getProcessedRaw()); + sourceMap = sourceConnectorMessage.getSourceMap(); + responseMap.putAll(sourceConnectorMessage.getResponseMap()); + channelMap.putAll(sourceConnectorMessage.getChannelMap()); } - }); - for (ConnectorMessage connectorMessage : orderedConnectorMessages) { - if (connectorMessage.getMetaDataId() > 0) { - if (sourceMap == null) { - sourceMap = connectorMessage.getSourceMap(); + List orderedConnectorMessages = new ArrayList(connectorMessages.values()); + Collections.sort(orderedConnectorMessages, new Comparator() { + @Override + public int compare(ConnectorMessage m1, ConnectorMessage m2) { + if (m1.getChainId() == m2.getChainId()) { + return m1.getOrderId() - m2.getOrderId(); + } else { + return m1.getChainId() - m2.getChainId(); + } + } + }); + + for (ConnectorMessage connectorMessage : orderedConnectorMessages) { + if (connectorMessage.getMetaDataId() > 0) { + if (sourceMap == null) { + sourceMap = connectorMessage.getSourceMap(); + } + responseMap.putAll(connectorMessage.getResponseMap()); + channelMap.putAll(connectorMessage.getChannelMap()); } - responseMap.putAll(connectorMessage.getResponseMap()); - channelMap.putAll(connectorMessage.getChannelMap()); } + + mergedConnectorMessage.setSourceMap(sourceMap); + mergedConnectorMessage.setResponseMap(responseMap); + mergedConnectorMessage.setChannelMap(channelMap); } - mergedConnectorMessage.setSourceMap(sourceMap); - mergedConnectorMessage.setResponseMap(responseMap); - mergedConnectorMessage.setChannelMap(channelMap); + return mergedConnectorMessage; } - - return mergedConnectorMessage; } public String toString() { From 565a76c2bb5f136b0884da65d2efc7a4a4bbb034 Mon Sep 17 00:00:00 2001 From: sbraconnier Date: Sat, 19 Jul 2025 10:53:57 -0400 Subject: [PATCH 2/2] Added safePutAll and use it in getMergedConnectorMessage to fix #92 Signed-off-by: sbraconnier --- .../connect/donkey/model/message/Message.java | 91 +++++++++---------- .../mirth/connect/donkey/util/MapUtil.java | 34 +++++++ 2 files changed, 77 insertions(+), 48 deletions(-) diff --git a/donkey/src/main/java/com/mirth/connect/donkey/model/message/Message.java b/donkey/src/main/java/com/mirth/connect/donkey/model/message/Message.java index d2c836d11f..c56dca75a6 100644 --- a/donkey/src/main/java/com/mirth/connect/donkey/model/message/Message.java +++ b/donkey/src/main/java/com/mirth/connect/donkey/model/message/Message.java @@ -14,12 +14,14 @@ import java.util.Calendar; import java.util.Collections; import java.util.Comparator; +import java.util.ConcurrentModificationException; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import com.mirth.connect.donkey.model.message.attachment.Attachment; +import com.mirth.connect.donkey.util.MapUtil; import com.thoughtworks.xstream.annotations.XStreamAlias; @XStreamAlias("message") @@ -114,62 +116,55 @@ public Map getConnectorMessages() { } public ConnectorMessage getMergedConnectorMessage() { + if (mergedConnectorMessage == null) { + mergedConnectorMessage = new ConnectorMessage(); + mergedConnectorMessage.setChannelId(channelId); + mergedConnectorMessage.setMessageId(messageId); + mergedConnectorMessage.setServerId(serverId); + mergedConnectorMessage.setReceivedDate(receivedDate); + + Map sourceMap = null; + Map responseMap = new HashMap(); + Map channelMap = new HashMap(); + + ConnectorMessage sourceConnectorMessage = connectorMessages.get(0); + + if (sourceConnectorMessage != null) { + mergedConnectorMessage.setRaw(sourceConnectorMessage.getRaw()); + mergedConnectorMessage.setProcessedRaw(sourceConnectorMessage.getProcessedRaw()); + sourceMap = sourceConnectorMessage.getSourceMap(); + responseMap.putAll(sourceConnectorMessage.getResponseMap()); + channelMap.putAll(sourceConnectorMessage.getChannelMap()); + } - if (mergedConnectorMessage != null) { - return mergedConnectorMessage; - } - - synchronized (this) { - if (mergedConnectorMessage == null) { - mergedConnectorMessage = new ConnectorMessage(); - mergedConnectorMessage.setChannelId(channelId); - mergedConnectorMessage.setMessageId(messageId); - mergedConnectorMessage.setServerId(serverId); - mergedConnectorMessage.setReceivedDate(receivedDate); - - Map sourceMap = null; - Map responseMap = new HashMap(); - Map channelMap = new HashMap(); - - ConnectorMessage sourceConnectorMessage = connectorMessages.get(0); - - if (sourceConnectorMessage != null) { - mergedConnectorMessage.setRaw(sourceConnectorMessage.getRaw()); - mergedConnectorMessage.setProcessedRaw(sourceConnectorMessage.getProcessedRaw()); - sourceMap = sourceConnectorMessage.getSourceMap(); - responseMap.putAll(sourceConnectorMessage.getResponseMap()); - channelMap.putAll(sourceConnectorMessage.getChannelMap()); + List orderedConnectorMessages = new ArrayList(connectorMessages.values()); + Collections.sort(orderedConnectorMessages, new Comparator() { + @Override + public int compare(ConnectorMessage m1, ConnectorMessage m2) { + if (m1.getChainId() == m2.getChainId()) { + return m1.getOrderId() - m2.getOrderId(); + } else { + return m1.getChainId() - m2.getChainId(); + } } + }); - List orderedConnectorMessages = new ArrayList(connectorMessages.values()); - Collections.sort(orderedConnectorMessages, new Comparator() { - @Override - public int compare(ConnectorMessage m1, ConnectorMessage m2) { - if (m1.getChainId() == m2.getChainId()) { - return m1.getOrderId() - m2.getOrderId(); - } else { - return m1.getChainId() - m2.getChainId(); - } - } - }); - - for (ConnectorMessage connectorMessage : orderedConnectorMessages) { - if (connectorMessage.getMetaDataId() > 0) { - if (sourceMap == null) { - sourceMap = connectorMessage.getSourceMap(); - } - responseMap.putAll(connectorMessage.getResponseMap()); - channelMap.putAll(connectorMessage.getChannelMap()); + for (ConnectorMessage connectorMessage : orderedConnectorMessages) { + if (connectorMessage.getMetaDataId() > 0) { + if (sourceMap == null) { + sourceMap = connectorMessage.getSourceMap(); } + MapUtil.safePutAll(responseMap, connectorMessage.getResponseMap(), 5); + MapUtil.safePutAll(channelMap, connectorMessage.getChannelMap(), 5); } - - mergedConnectorMessage.setSourceMap(sourceMap); - mergedConnectorMessage.setResponseMap(responseMap); - mergedConnectorMessage.setChannelMap(channelMap); } - return mergedConnectorMessage; + mergedConnectorMessage.setSourceMap(sourceMap); + mergedConnectorMessage.setResponseMap(responseMap); + mergedConnectorMessage.setChannelMap(channelMap); } + + return mergedConnectorMessage; } public String toString() { diff --git a/donkey/src/main/java/com/mirth/connect/donkey/util/MapUtil.java b/donkey/src/main/java/com/mirth/connect/donkey/util/MapUtil.java index d13a3485c3..363176603d 100644 --- a/donkey/src/main/java/com/mirth/connect/donkey/util/MapUtil.java +++ b/donkey/src/main/java/com/mirth/connect/donkey/util/MapUtil.java @@ -9,6 +9,7 @@ package com.mirth.connect.donkey.util; +import java.util.ConcurrentModificationException; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; @@ -132,4 +133,37 @@ public static Map deserializeMapWithInvalidValues(Serializer ser return map; } + + /** + * Tries to safely put all entries from a possibly concurrently modified source map into the + * target map. Retries up to the specified maxAttempts if ConcurrentModificationException is + * thrown, sleeping 10ms between retries. + * + * @param target + * The destination map + * @param source + * The source map that might be modified concurrently + * @param maxAttempts + * The maximum number of retries + * @return true if the putAll succeeded, false if it failed after retries + */ + public static boolean safePutAll(Map target, Map source, final int maxAttempts) { + + int attempts = 0; + while (attempts++ < maxAttempts) { + try { + target.putAll(source); + return true; + } catch (ConcurrentModificationException e) { + try { + Thread.sleep(10); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + return false; + } + } + } + + return false; + } }