Skip to content

Commit 0e5c8c8

Browse files
authored
Made start/stop of SocketInitiator/SocketAcceptor thread-safe. (#324)
* Made iterator removal thread-safe. - This could throw ConcurrentModificationExceptions when initiator/acceptor was started/stopped from different threads. * prevent concurrent access to start()/stop() method * allow access to either stop() or start() at the same time
1 parent 690afa9 commit 0e5c8c8

File tree

4 files changed

+45
-41
lines changed

4 files changed

+45
-41
lines changed

quickfixj-core/src/main/java/quickfix/SocketAcceptor.java

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package quickfix;
2121

22+
import java.util.concurrent.atomic.AtomicBoolean;
2223
import quickfix.mina.EventHandlingStrategy;
2324
import quickfix.mina.SingleThreadedEventHandlingStrategy;
2425
import quickfix.mina.acceptor.AbstractSocketAcceptor;
@@ -28,7 +29,7 @@
2829
* sessions.
2930
*/
3031
public class SocketAcceptor extends AbstractSocketAcceptor {
31-
private volatile Boolean isStarted = Boolean.FALSE;
32+
private final AtomicBoolean isStarted = new AtomicBoolean(false);
3233
private final SingleThreadedEventHandlingStrategy eventHandlingStrategy;
3334

3435
private SocketAcceptor(Builder builder) throws ConfigError {
@@ -103,30 +104,30 @@ public void start() throws ConfigError, RuntimeError {
103104
}
104105

105106
private void initialize() throws ConfigError {
106-
if (isStarted.equals(Boolean.FALSE)) {
107-
eventHandlingStrategy.setExecutor(longLivedExecutor);
108-
startAcceptingConnections();
109-
eventHandlingStrategy.blockInThread();
110-
isStarted = Boolean.TRUE;
111-
} else {
112-
log.warn("Ignored attempt to start already running SocketAcceptor.");
107+
synchronized (isStarted) {
108+
if (isStarted.compareAndSet(false, true)) {
109+
eventHandlingStrategy.setExecutor(longLivedExecutor);
110+
startAcceptingConnections();
111+
eventHandlingStrategy.blockInThread();
112+
}
113113
}
114114
}
115115

116116
@Override
117117
public void stop(boolean forceDisconnect) {
118-
if (isStarted.equals(Boolean.TRUE)) {
119-
try {
120-
logoutAllSessions(forceDisconnect);
121-
stopAcceptingConnections();
122-
stopSessionTimer();
123-
} finally {
118+
synchronized (isStarted) {
119+
if (isStarted.compareAndSet(true, false)) {
124120
try {
125-
eventHandlingStrategy.stopHandlingMessages(true);
121+
logoutAllSessions(forceDisconnect);
122+
stopAcceptingConnections();
123+
stopSessionTimer();
126124
} finally {
127-
Session.unregisterSessions(getSessions(), true);
128-
clearConnectorSessions();
129-
isStarted = Boolean.FALSE;
125+
try {
126+
eventHandlingStrategy.stopHandlingMessages(true);
127+
} finally {
128+
Session.unregisterSessions(getSessions(), true);
129+
clearConnectorSessions();
130+
}
130131
}
131132
}
132133
}

quickfixj-core/src/main/java/quickfix/SocketInitiator.java

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package quickfix;
2121

22+
import java.util.concurrent.atomic.AtomicBoolean;
2223
import quickfix.mina.EventHandlingStrategy;
2324
import quickfix.mina.SingleThreadedEventHandlingStrategy;
2425
import quickfix.mina.initiator.AbstractSocketInitiator;
@@ -28,7 +29,7 @@
2829
* sessions.
2930
*/
3031
public class SocketInitiator extends AbstractSocketInitiator {
31-
private volatile Boolean isStarted = Boolean.FALSE;
32+
private final AtomicBoolean isStarted = new AtomicBoolean(false);
3233
private final SingleThreadedEventHandlingStrategy eventHandlingStrategy;
3334

3435
private SocketInitiator(Builder builder) throws ConfigError {
@@ -120,33 +121,33 @@ public void start() throws ConfigError, RuntimeError {
120121
}
121122

122123
private void initialize() throws ConfigError {
123-
if (isStarted.equals(Boolean.FALSE)) {
124-
eventHandlingStrategy.setExecutor(longLivedExecutor);
125-
createSessionInitiators();
126-
for (Session session : getSessionMap().values()) {
127-
Session.registerSession(session);
124+
synchronized (isStarted) {
125+
if (isStarted.compareAndSet(false, true)) {
126+
eventHandlingStrategy.setExecutor(longLivedExecutor);
127+
createSessionInitiators();
128+
for (Session session : getSessionMap().values()) {
129+
Session.registerSession(session);
130+
}
131+
startInitiators();
132+
eventHandlingStrategy.blockInThread();
128133
}
129-
startInitiators();
130-
eventHandlingStrategy.blockInThread();
131-
isStarted = Boolean.TRUE;
132-
} else {
133-
log.warn("Ignored attempt to start already running SocketInitiator.");
134134
}
135135
}
136136

137137
@Override
138138
public void stop(boolean forceDisconnect) {
139-
if (isStarted.equals(Boolean.TRUE)) {
140-
try {
141-
logoutAllSessions(forceDisconnect);
142-
stopInitiators();
143-
} finally {
139+
synchronized (isStarted) {
140+
if (isStarted.compareAndSet(true, false)) {
144141
try {
145-
eventHandlingStrategy.stopHandlingMessages(true);
142+
logoutAllSessions(forceDisconnect);
143+
stopInitiators();
146144
} finally {
147-
Session.unregisterSessions(getSessions(), true);
148-
clearConnectorSessions();
149-
isStarted = Boolean.FALSE;
145+
try {
146+
eventHandlingStrategy.stopHandlingMessages(true);
147+
} finally {
148+
Session.unregisterSessions(getSessions(), true);
149+
clearConnectorSessions();
150+
}
150151
}
151152
}
152153
}

quickfixj-core/src/main/java/quickfix/mina/acceptor/AbstractSocketAcceptor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@
5656
import java.util.HashMap;
5757
import java.util.Iterator;
5858
import java.util.Map;
59+
import java.util.concurrent.ConcurrentHashMap;
60+
import java.util.concurrent.ConcurrentMap;
5961

6062
/**
6163
* Abstract base class for socket acceptors.
@@ -64,7 +66,7 @@ public abstract class AbstractSocketAcceptor extends SessionConnector implements
6466
private final Map<SocketAddress, AcceptorSessionProvider> sessionProviders = new HashMap<>();
6567
private final SessionFactory sessionFactory;
6668
private final Map<SocketAddress, AcceptorSocketDescriptor> socketDescriptorForAddress = new HashMap<>();
67-
private final Map<AcceptorSocketDescriptor, IoAcceptor> ioAcceptors = new HashMap<>();
69+
private final ConcurrentMap<AcceptorSocketDescriptor, IoAcceptor> ioAcceptors = new ConcurrentHashMap<>();
6870

6971
protected AbstractSocketAcceptor(SessionSettings settings, SessionFactory sessionFactory)
7072
throws ConfigError {

quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,10 @@
4747
import java.util.ArrayList;
4848
import java.util.Collections;
4949
import java.util.HashMap;
50-
import java.util.HashSet;
5150
import java.util.Iterator;
5251
import java.util.Map;
5352
import java.util.Set;
53+
import java.util.concurrent.ConcurrentHashMap;
5454
import java.util.concurrent.Executors;
5555
import java.util.concurrent.ScheduledExecutorService;
5656
import java.util.concurrent.ThreadFactory;
@@ -63,7 +63,7 @@
6363
public abstract class AbstractSocketInitiator extends SessionConnector implements Initiator {
6464

6565
protected final Logger log = LoggerFactory.getLogger(getClass());
66-
private final Set<IoSessionInitiator> initiators = new HashSet<>();
66+
private final Set<IoSessionInitiator> initiators = ConcurrentHashMap.newKeySet();
6767
private final ScheduledExecutorService scheduledReconnectExecutor;
6868
public static final String QFJ_RECONNECT_THREAD_PREFIX = "QFJ Reconnect Thread-";
6969

0 commit comments

Comments
 (0)