Skip to content

Commit 565a76c

Browse files
committed
Added safePutAll and use it in getMergedConnectorMessage to fix #92
Signed-off-by: sbraconnier <simonbraconnier@gmail.com>
1 parent 3639251 commit 565a76c

File tree

2 files changed

+77
-48
lines changed

2 files changed

+77
-48
lines changed

donkey/src/main/java/com/mirth/connect/donkey/model/message/Message.java

Lines changed: 43 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,14 @@
1414
import java.util.Calendar;
1515
import java.util.Collections;
1616
import java.util.Comparator;
17+
import java.util.ConcurrentModificationException;
1718
import java.util.HashMap;
1819
import java.util.LinkedHashMap;
1920
import java.util.List;
2021
import java.util.Map;
2122

2223
import com.mirth.connect.donkey.model.message.attachment.Attachment;
24+
import com.mirth.connect.donkey.util.MapUtil;
2325
import com.thoughtworks.xstream.annotations.XStreamAlias;
2426

2527
@XStreamAlias("message")
@@ -114,62 +116,55 @@ public Map<Integer, ConnectorMessage> getConnectorMessages() {
114116
}
115117

116118
public ConnectorMessage getMergedConnectorMessage() {
119+
if (mergedConnectorMessage == null) {
120+
mergedConnectorMessage = new ConnectorMessage();
121+
mergedConnectorMessage.setChannelId(channelId);
122+
mergedConnectorMessage.setMessageId(messageId);
123+
mergedConnectorMessage.setServerId(serverId);
124+
mergedConnectorMessage.setReceivedDate(receivedDate);
125+
126+
Map<String, Object> sourceMap = null;
127+
Map<String, Object> responseMap = new HashMap<String, Object>();
128+
Map<String, Object> channelMap = new HashMap<String, Object>();
129+
130+
ConnectorMessage sourceConnectorMessage = connectorMessages.get(0);
131+
132+
if (sourceConnectorMessage != null) {
133+
mergedConnectorMessage.setRaw(sourceConnectorMessage.getRaw());
134+
mergedConnectorMessage.setProcessedRaw(sourceConnectorMessage.getProcessedRaw());
135+
sourceMap = sourceConnectorMessage.getSourceMap();
136+
responseMap.putAll(sourceConnectorMessage.getResponseMap());
137+
channelMap.putAll(sourceConnectorMessage.getChannelMap());
138+
}
117139

118-
if (mergedConnectorMessage != null) {
119-
return mergedConnectorMessage;
120-
}
121-
122-
synchronized (this) {
123-
if (mergedConnectorMessage == null) {
124-
mergedConnectorMessage = new ConnectorMessage();
125-
mergedConnectorMessage.setChannelId(channelId);
126-
mergedConnectorMessage.setMessageId(messageId);
127-
mergedConnectorMessage.setServerId(serverId);
128-
mergedConnectorMessage.setReceivedDate(receivedDate);
129-
130-
Map<String, Object> sourceMap = null;
131-
Map<String, Object> responseMap = new HashMap<String, Object>();
132-
Map<String, Object> channelMap = new HashMap<String, Object>();
133-
134-
ConnectorMessage sourceConnectorMessage = connectorMessages.get(0);
135-
136-
if (sourceConnectorMessage != null) {
137-
mergedConnectorMessage.setRaw(sourceConnectorMessage.getRaw());
138-
mergedConnectorMessage.setProcessedRaw(sourceConnectorMessage.getProcessedRaw());
139-
sourceMap = sourceConnectorMessage.getSourceMap();
140-
responseMap.putAll(sourceConnectorMessage.getResponseMap());
141-
channelMap.putAll(sourceConnectorMessage.getChannelMap());
140+
List<ConnectorMessage> orderedConnectorMessages = new ArrayList<ConnectorMessage>(connectorMessages.values());
141+
Collections.sort(orderedConnectorMessages, new Comparator<ConnectorMessage>() {
142+
@Override
143+
public int compare(ConnectorMessage m1, ConnectorMessage m2) {
144+
if (m1.getChainId() == m2.getChainId()) {
145+
return m1.getOrderId() - m2.getOrderId();
146+
} else {
147+
return m1.getChainId() - m2.getChainId();
148+
}
142149
}
150+
});
143151

144-
List<ConnectorMessage> orderedConnectorMessages = new ArrayList<ConnectorMessage>(connectorMessages.values());
145-
Collections.sort(orderedConnectorMessages, new Comparator<ConnectorMessage>() {
146-
@Override
147-
public int compare(ConnectorMessage m1, ConnectorMessage m2) {
148-
if (m1.getChainId() == m2.getChainId()) {
149-
return m1.getOrderId() - m2.getOrderId();
150-
} else {
151-
return m1.getChainId() - m2.getChainId();
152-
}
153-
}
154-
});
155-
156-
for (ConnectorMessage connectorMessage : orderedConnectorMessages) {
157-
if (connectorMessage.getMetaDataId() > 0) {
158-
if (sourceMap == null) {
159-
sourceMap = connectorMessage.getSourceMap();
160-
}
161-
responseMap.putAll(connectorMessage.getResponseMap());
162-
channelMap.putAll(connectorMessage.getChannelMap());
152+
for (ConnectorMessage connectorMessage : orderedConnectorMessages) {
153+
if (connectorMessage.getMetaDataId() > 0) {
154+
if (sourceMap == null) {
155+
sourceMap = connectorMessage.getSourceMap();
163156
}
157+
MapUtil.safePutAll(responseMap, connectorMessage.getResponseMap(), 5);
158+
MapUtil.safePutAll(channelMap, connectorMessage.getChannelMap(), 5);
164159
}
165-
166-
mergedConnectorMessage.setSourceMap(sourceMap);
167-
mergedConnectorMessage.setResponseMap(responseMap);
168-
mergedConnectorMessage.setChannelMap(channelMap);
169160
}
170161

171-
return mergedConnectorMessage;
162+
mergedConnectorMessage.setSourceMap(sourceMap);
163+
mergedConnectorMessage.setResponseMap(responseMap);
164+
mergedConnectorMessage.setChannelMap(channelMap);
172165
}
166+
167+
return mergedConnectorMessage;
173168
}
174169

175170
public String toString() {

donkey/src/main/java/com/mirth/connect/donkey/util/MapUtil.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package com.mirth.connect.donkey.util;
1111

12+
import java.util.ConcurrentModificationException;
1213
import java.util.HashMap;
1314
import java.util.Map;
1415
import java.util.Map.Entry;
@@ -132,4 +133,37 @@ public static Map<String, Object> deserializeMapWithInvalidValues(Serializer ser
132133

133134
return map;
134135
}
136+
137+
/**
138+
* Tries to safely put all entries from a possibly concurrently modified source map into the
139+
* target map. Retries up to the specified maxAttempts if ConcurrentModificationException is
140+
* thrown, sleeping 10ms between retries.
141+
*
142+
* @param target
143+
* The destination map
144+
* @param source
145+
* The source map that might be modified concurrently
146+
* @param maxAttempts
147+
* The maximum number of retries
148+
* @return true if the putAll succeeded, false if it failed after retries
149+
*/
150+
public static <K, V> boolean safePutAll(Map<K, V> target, Map<K, V> source, final int maxAttempts) {
151+
152+
int attempts = 0;
153+
while (attempts++ < maxAttempts) {
154+
try {
155+
target.putAll(source);
156+
return true;
157+
} catch (ConcurrentModificationException e) {
158+
try {
159+
Thread.sleep(10);
160+
} catch (InterruptedException ie) {
161+
Thread.currentThread().interrupt();
162+
return false;
163+
}
164+
}
165+
}
166+
167+
return false;
168+
}
135169
}

0 commit comments

Comments
 (0)