diff --git a/pom.xml b/pom.xml index 1c12bf1c5..54bd0edc0 100644 --- a/pom.xml +++ b/pom.xml @@ -56,7 +56,8 @@ quickfixj-all quickfixj-distribution quickfixj-perf-test - + quickfixj-stress-test + UTF-8 @@ -69,7 +70,7 @@ 8 8 - 3.9.11 + 3.9.11 ${maven.version} ${maven.version} 3.3.1 @@ -102,7 +103,9 @@ OrchestraFIXLatest.xml 1.0.3 2.0.0 - 1.37 + 1.37 + 0.16 + 1.8.0.10 @@ -273,7 +276,7 @@ ${maven-compiler-plugin-version} true - true + true 2g 4g diff --git a/quickfixj-core/pom.xml b/quickfixj-core/pom.xml index 8b7de82dc..c3f804fb4 100644 --- a/quickfixj-core/pom.xml +++ b/quickfixj-core/pom.xml @@ -102,7 +102,7 @@ hsqldb hsqldb - 1.8.0.10 + ${hsqldb.version} test diff --git a/quickfixj-core/src/main/java/quickfix/JdbcStore.java b/quickfixj-core/src/main/java/quickfix/JdbcStore.java index a1f901e4e..0087e9de8 100644 --- a/quickfixj-core/src/main/java/quickfix/JdbcStore.java +++ b/quickfixj-core/src/main/java/quickfix/JdbcStore.java @@ -54,19 +54,9 @@ class JdbcStore implements MessageStore { public JdbcStore(SessionSettings settings, SessionID sessionID, DataSource ds) throws Exception { this.sessionID = sessionID; - if (settings.isSetting(sessionID, SETTING_JDBC_STORE_SESSIONS_TABLE_NAME)) { - sessionTableName = settings - .getString(sessionID, SETTING_JDBC_STORE_SESSIONS_TABLE_NAME); - } else { - sessionTableName = DEFAULT_SESSION_TABLE_NAME; - } - if (settings.isSetting(sessionID, SETTING_JDBC_STORE_MESSAGES_TABLE_NAME)) { - messageTableName = settings - .getString(sessionID, SETTING_JDBC_STORE_MESSAGES_TABLE_NAME); - } else { - messageTableName = DEFAULT_MESSAGE_TABLE_NAME; - } + sessionTableName = getSessionTableName(settings, sessionID); + messageTableName = getMessageTableName(settings, sessionID); if (settings.isSetting(sessionID, SETTING_JDBC_SESSION_ID_DEFAULT_PROPERTY_VALUE)) { defaultSessionIdPropertyValue = settings.getString(sessionID, @@ -90,34 +80,67 @@ public JdbcStore(SessionSettings settings, SessionID sessionID, DataSource ds) t loadCache(); } - private void setSqlStrings() { - String idWhereClause = JdbcUtil.getIDWhereClause(extendedSessionIdSupported); - String idColumns = JdbcUtil.getIDColumns(extendedSessionIdSupported); - String idPlaceholders = JdbcUtil.getIDPlaceholders(extendedSessionIdSupported); + public static String getSessionTableName(SessionSettings settings, SessionID sessionID) throws ConfigError { + if (settings.isSetting(sessionID, SETTING_JDBC_STORE_SESSIONS_TABLE_NAME)) { + return settings.getString(sessionID, SETTING_JDBC_STORE_SESSIONS_TABLE_NAME); + } else { + return DEFAULT_SESSION_TABLE_NAME; + } + } - SQL_UPDATE_SEQNUMS = "UPDATE " + sessionTableName + " SET incoming_seqnum=?, " - + "outgoing_seqnum=? WHERE " + idWhereClause; + public static String getMessageTableName(SessionSettings settings, SessionID sessionID) throws ConfigError { + if (settings.isSetting(sessionID, SETTING_JDBC_STORE_MESSAGES_TABLE_NAME)) { + return settings.getString(sessionID, SETTING_JDBC_STORE_MESSAGES_TABLE_NAME); + } else { + return DEFAULT_MESSAGE_TABLE_NAME; + } + } - SQL_INSERT_SESSION = "INSERT INTO " + sessionTableName + " (" + idColumns - + ", creation_time,incoming_seqnum, outgoing_seqnum) VALUES (" + idPlaceholders - + ",?,?,?)"; + public static String getUpdateSequenceNumsSql(String sessionTableName, String idWhereClause) { + return "UPDATE " + sessionTableName + " SET incoming_seqnum=?, " + "outgoing_seqnum=? WHERE " + idWhereClause; + } - SQL_GET_SEQNUMS = "SELECT creation_time, incoming_seqnum, outgoing_seqnum FROM " - + sessionTableName + " WHERE " + idWhereClause; + public static String getInsertSessionSql(String sessionTableName, String idColumns, String idPlaceholders) { + return "INSERT INTO " + sessionTableName + " (" + idColumns + ", creation_time,incoming_seqnum, outgoing_seqnum) VALUES (" + idPlaceholders + ",?,?,?)"; + } - SQL_UPDATE_MESSAGE = "UPDATE " + messageTableName + " SET message=? " + "WHERE " - + idWhereClause + " and msgseqnum=?"; + public static String getSequenceNumsSql(String sessionTableName, String idWhereClause) { + return "SELECT creation_time, incoming_seqnum, outgoing_seqnum FROM " + sessionTableName + " WHERE " + idWhereClause; + } - SQL_INSERT_MESSAGE = "INSERT INTO " + messageTableName + " (" + idColumns - + ", msgseqnum,message) VALUES (" + idPlaceholders + ",?,?)"; + public static String getUpdateMessageSql(String messageTableName, String idWhereClause) { + return "UPDATE " + messageTableName + " SET message=? " + "WHERE " + idWhereClause + " and msgseqnum=?"; + } - SQL_GET_MESSAGES = "SELECT message FROM " + messageTableName + " WHERE " + idWhereClause - + " and msgseqnum>=? and msgseqnum<=? " + "ORDER BY msgseqnum"; + public static String getInsertMessageSql(String messageTableName, String idColumns, String idPlaceholders) { + return "INSERT INTO " + messageTableName + " (" + idColumns + ", msgseqnum,message) VALUES (" + idPlaceholders + ",?,?)"; + } - SQL_UPDATE_SESSION = "UPDATE " + sessionTableName + " SET creation_time=?, " - + "incoming_seqnum=?, outgoing_seqnum=? " + "WHERE " + idWhereClause; + public static String getMessagesSql(String messageTableName, String idWhereClause) { + return "SELECT message FROM " + messageTableName + " WHERE " + idWhereClause + " and msgseqnum>=? and msgseqnum<=? " + "ORDER BY msgseqnum"; + } + + public static String getUpdateSessionSql(String sessionTableName, String idWhereClause) { + return "UPDATE " + sessionTableName + " SET creation_time=?, " + "incoming_seqnum=?, outgoing_seqnum=? " + "WHERE " + idWhereClause; + } - SQL_DELETE_MESSAGES = "DELETE FROM " + messageTableName + " WHERE " + idWhereClause; + public static String getDeleteMessagesSql(String messageTableName, String idWhereClause) { + return "DELETE FROM " + messageTableName + " WHERE " + idWhereClause; + } + + private void setSqlStrings() { + String idWhereClause = JdbcUtil.getIDWhereClause(extendedSessionIdSupported); + String idColumns = JdbcUtil.getIDColumns(extendedSessionIdSupported); + String idPlaceholders = JdbcUtil.getIDPlaceholders(extendedSessionIdSupported); + + SQL_UPDATE_SEQNUMS = getUpdateSequenceNumsSql(sessionTableName, idWhereClause); + SQL_INSERT_SESSION = getInsertSessionSql(sessionTableName, idColumns, idPlaceholders); + SQL_GET_SEQNUMS = getSequenceNumsSql(sessionTableName, idWhereClause); + SQL_UPDATE_MESSAGE = getUpdateMessageSql(messageTableName, idWhereClause); + SQL_INSERT_MESSAGE = getInsertMessageSql(messageTableName, idColumns, idPlaceholders); + SQL_GET_MESSAGES = getMessagesSql(messageTableName, idWhereClause); + SQL_UPDATE_SESSION = getUpdateSessionSql(sessionTableName, idWhereClause); + SQL_DELETE_MESSAGES = getDeleteMessagesSql(messageTableName, idWhereClause); } private void loadCache() throws SQLException, IOException { @@ -238,7 +261,6 @@ public void get(int startSequence, int endSequence, Collection messages) public boolean set(int sequence, String message) throws IOException { Connection connection = null; PreparedStatement insert = null; - ResultSet rs = null; try { connection = dataSource.getConnection(); insert = connection.prepareStatement(SQL_INSERT_MESSAGE); @@ -263,7 +285,6 @@ public boolean set(int sequence, String message) throws IOException { } } } finally { - JdbcUtil.close(sessionID, rs); JdbcUtil.close(sessionID, insert); JdbcUtil.close(sessionID, connection); } diff --git a/quickfixj-distribution/pom.xml b/quickfixj-distribution/pom.xml index 5d6854d74..b5f045c06 100644 --- a/quickfixj-distribution/pom.xml +++ b/quickfixj-distribution/pom.xml @@ -122,11 +122,16 @@ quickfixj-messages-fix40 ${project.version} - - org.quickfixj - quickfixj-perf-test - ${project.version} - + + org.quickfixj + quickfixj-perf-test + ${project.version} + + + org.quickfixj + quickfixj-stress-test + ${project.version} + com.sleepycat je diff --git a/quickfixj-stress-test/README.md b/quickfixj-stress-test/README.md new file mode 100644 index 000000000..179fb1bd5 --- /dev/null +++ b/quickfixj-stress-test/README.md @@ -0,0 +1,80 @@ +# QuickFIX/J Java Concurrency Stress Test + +This is a [Java Concurrency Stress (jcstress)](https://github.com/openjdk/jcstress) concurrency stress testing module for QuickFIX/J FIX protocol implementation. + +## How to Run + +Concurrency stress testing classes can be individually run using your favorite IDE or command line. + +### Building the Executable JAR + +#### Build Full Project + +To build the entire project including all modules: + +```bash +$ mvn clean package +``` + +#### Build Only `quickfixj-stress-test` Module + +Build the stress test module with required dependencies, skipping test execution: + +```bash +$ mvn clean package -pl quickfixj-stress-test -am -PskipAT,skipBundlePlugin -DskipTests +``` + +**Command Options Explained:** +- `-pl quickfixj-stress-test` - Build only the stress test module +- `-am` - Also build required dependency modules +- `-PskipAT,skipBundlePlugin` - Skip acceptance tests and bundle plugin +- `-DskipTests` - Skip running unit test cases during build + +### Running Tests + +#### Run Complete Test Suite + +Execute all concurrency regression test cases: + +```bash +$ java -jar quickfixj-stress-test/target/quickfixj-stress-test.jar +``` + +#### Run a Single Test + +Run a specific test class or test group (if test cases are nested): + +```bash +$ java -jar quickfixj-stress-test/target/quickfixj-stress-test.jar -t JdbcStoreStressTest +``` + +#### View Available Options + +Display all available command-line options: + +```bash +$ java -jar quickfixj-stress-test/target/quickfixj-stress-test.jar -h +``` + +## Common Options + +Some useful jcstress options include: + +- `h` - Display help +- `-t ` - Run specific test or test group +- `-v` - Verbose mode +- `-jvmArgs ` - Additional JVM arguments + +## Additional Notes + +- For CI/CD integration, use the full path to the JAR file +- Stress tests may take significant time to complete depending on your system resources +- Review jcstress documentation for advanced configuration options + +## Troubleshooting + +If you encounter issues: + +- Verify the JAR file was built successfully in `quickfixj-stress-test/target/` +- Check that you have sufficient system resources (CPU, memory) +- Review the jcstress output for specific error messages \ No newline at end of file diff --git a/quickfixj-stress-test/pom.xml b/quickfixj-stress-test/pom.xml new file mode 100644 index 000000000..7b2738411 --- /dev/null +++ b/quickfixj-stress-test/pom.xml @@ -0,0 +1,74 @@ + + + 4.0.0 + + + org.quickfixj + quickfixj-parent + 3.0.0-SNAPSHOT + + + quickfixj-stress-test + jar + + QuickFIX/J Stress test + QuickFIX/J Stress test + http://www.quickfixj.org + + + + org.quickfixj + quickfixj-core + ${project.version} + + + org.openjdk.jcstress + jcstress-core + ${jcstress.version} + + + org.slf4j + slf4j-jdk14 + ${slf4j.version} + + + org.mockito + mockito-core + compile + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + create-jcstress-jar + package + + shade + + + ${project.artifactId} + + + org.openjdk.jcstress.Main + + + META-INF/TestList + + + false + true + standalone + + + + + + + \ No newline at end of file diff --git a/quickfixj-stress-test/src/main/java/quickfix/JdbcStoreStressTest.java b/quickfixj-stress-test/src/main/java/quickfix/JdbcStoreStressTest.java new file mode 100644 index 000000000..97f3fab22 --- /dev/null +++ b/quickfixj-stress-test/src/main/java/quickfix/JdbcStoreStressTest.java @@ -0,0 +1,805 @@ +package quickfix; + +import org.openjdk.jcstress.annotations.Actor; +import org.openjdk.jcstress.annotations.Arbiter; +import org.openjdk.jcstress.annotations.Expect; +import org.openjdk.jcstress.annotations.JCStressTest; +import org.openjdk.jcstress.annotations.Outcome; +import org.openjdk.jcstress.annotations.State; +import org.openjdk.jcstress.infra.results.JJJJ_Result; + +import javax.sql.DataSource; +import java.io.IOException; +import java.io.InputStream; +import java.io.Reader; +import java.math.BigDecimal; +import java.net.URL; +import java.sql.Array; +import java.sql.Blob; +import java.sql.Clob; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.Date; +import java.sql.NClob; +import java.sql.ParameterMetaData; +import java.sql.PreparedStatement; +import java.sql.Ref; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.RowId; +import java.sql.SQLWarning; +import java.sql.SQLXML; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.Calendar; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +@SuppressWarnings("unused") +public class JdbcStoreStressTest { + + private static final SessionID SESSION_ID = new SessionID(FixVersions.BEGINSTRING_FIX44, "JDBC_INITIATOR", "JDBC_ACCEPTOR"); + + private static JdbcStoreWrapper createWrapper() throws Exception { + ResultSet columnsResultSet = mock(ResultSet.class); + doReturn(true).when(columnsResultSet).next(); + + SessionSettings settings = new SessionSettings(); + String sessionTableName = JdbcStore.getSessionTableName(settings, SESSION_ID); + String messageTableName = JdbcStore.getMessageTableName(settings, SESSION_ID); + + DatabaseMetaData metaData = mock(DatabaseMetaData.class); + doReturn(columnsResultSet).when(metaData).getColumns(null, null, sessionTableName.toUpperCase(), "SENDERSUBID"); + + Connection connection = mock(Connection.class); + doReturn(metaData).when(connection).getMetaData(); + + DataSource dataSource = mock(DataSource.class); + doReturn(connection).when(dataSource).getConnection(); + + boolean extendedSessionIdSupport = JdbcUtil.determineSessionIdSupport(dataSource, sessionTableName); + String idColumns = JdbcUtil.getIDColumns(extendedSessionIdSupport); + String idPlaceholders = JdbcUtil.getIDPlaceholders(extendedSessionIdSupport); + String idWhereClause = JdbcUtil.getIDWhereClause(extendedSessionIdSupport); + + // GET SEQUENCE NUMS + ResultSet getSequenceResult = mock(ResultSet.class); + doReturn(false).when(getSequenceResult).next(); + + PreparedStatement getSequenceNumsQuery = mock(PreparedStatement.class); + doReturn(getSequenceResult).when(getSequenceNumsQuery).executeQuery(); + + String sequenceNumsSql = JdbcStore.getSequenceNumsSql(sessionTableName, idWhereClause); + doReturn(getSequenceNumsQuery).when(connection).prepareStatement(sequenceNumsSql); + + // INSERT SESSION + PreparedStatement insertSessionQuery = mock(PreparedStatement.class); + + String insertSessionSql = JdbcStore.getInsertSessionSql(sessionTableName, idColumns, idPlaceholders); + doReturn(insertSessionQuery).when(connection).prepareStatement(insertSessionSql); + + // UPDATE SEQUENCE NUMS + Database database = new Database(); + + String updateSequenceNumsSql = JdbcStore.getUpdateSequenceNumsSql(sessionTableName, idWhereClause); + doAnswer(invocationOnMock -> new UpdateSequenceStatement(database)).when(connection).prepareStatement(updateSequenceNumsSql); + + JdbcStore jdbcStore = new JdbcStore(settings, SESSION_ID, dataSource); + + if (jdbcStore.getNextSenderMsgSeqNum() != 1) { + throw new IllegalStateException("Invalid next sender sequence: " + jdbcStore.getNextSenderMsgSeqNum()); + } + + if (jdbcStore.getNextTargetMsgSeqNum() != 1) { + throw new IllegalStateException("Invalid next target sequence: " + jdbcStore.getNextTargetMsgSeqNum()); + } + + return new JdbcStoreWrapper(jdbcStore, database); + } + + /** + *
+     *   JVM args: [-Dfile.encoding=UTF-8, -XX:-UseBiasedLocking, -XX:+StressLCM, -XX:+StressGCM]
+     *
+     *       RESULT  SAMPLES     FREQ      EXPECT  DESCRIPTION
+     *   2, 1, 2, 1        0    0,00%   Forbidden  Invalid source and target sequences stored in the database
+     *   2, 1, 2, 2        0    0,00%   Forbidden  Invalid source sequence stored in the database
+     *   2, 2, 2, 1        1   <0,01%   Forbidden  Invalid target sequence stored in the database
+     *   2, 2, 2, 2   30,886  100,00%  Acceptable
+     *
+     * 
+ */ + @State + @JCStressTest + @Outcome(id = "2, 2, 2, 2", expect = Expect.ACCEPTABLE) + @Outcome(id = "2, 2, 2, 1", expect = Expect.FORBIDDEN, desc = "Invalid target sequence stored in the database") + @Outcome(id = "2, 1, 2, 2", expect = Expect.FORBIDDEN, desc = "Invalid source sequence stored in the database") + @Outcome(id = "2, 1, 2, 1", expect = Expect.FORBIDDEN, desc = "Invalid source and target sequences stored in the database") + public static class SingleSenderSequenceTest { + + private final JdbcStoreWrapper underTest; + + public SingleSenderSequenceTest() { + try { + this.underTest = createWrapper(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + // application thread + @Actor + public void incrementSender() { + underTest.incrementSenderSequence(); + } + + // QFJ Message Processor + @Actor + public void incrementTarget() { + underTest.incrementTargetSequence(); + } + + @Arbiter + public void captureResult(JJJJ_Result result) { + result.r1 = underTest.getCacheSenderSequence(); + result.r2 = underTest.getCacheTargetSequence(); + result.r3 = underTest.getDbSenderSequence(); + result.r4 = underTest.getDbTargetSequence(); + } + } + + /** + *
+     *   JVM args: [-Dfile.encoding=UTF-8, -XX:-UseBiasedLocking, -XX:+StressLCM, -XX:+StressGCM]
+     *
+     *       RESULT  SAMPLES     FREQ      EXPECT  DESCRIPTION
+     *   3, 2, 2, 2        1   <0,01%   Forbidden
+     *   3, 2, 3, 2   46,502  100,00%  Acceptable
+     *
+     * 
+ */ + @State + @JCStressTest + @Outcome(id = "3, 2, 3, 2", expect = Expect.ACCEPTABLE) + @Outcome(expect = Expect.FORBIDDEN) + public static class TwoSendersSequenceTest { + + private final JdbcStoreWrapper underTest; + + public TwoSendersSequenceTest() { + try { + this.underTest = createWrapper(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + // application thread + @Actor + public void incrementSender1() { + underTest.incrementSenderSequence(); + } + + // application thread + @Actor + public void incrementSender2() { + underTest.incrementSenderSequence(); + } + + // QFJ Message Processor + @Actor + public void incrementTarget() { + underTest.incrementTargetSequence(); + } + + @Arbiter + public void captureResult(JJJJ_Result result) { + result.r1 = underTest.getCacheSenderSequence(); + result.r2 = underTest.getCacheTargetSequence(); + result.r3 = underTest.getDbSenderSequence(); + result.r4 = underTest.getDbTargetSequence(); + } + } + + private static final class JdbcStoreWrapper { + + private final JdbcStore store; + private final Database database; + private final Lock senderSequenceLock; + private final Lock targetSequenceLock; + + public JdbcStoreWrapper(JdbcStore store, Database database) { + this.store = store; + this.database = database; + this.senderSequenceLock = new ReentrantLock(); + this.targetSequenceLock = new ReentrantLock(); + } + + public void incrementSenderSequence() { + senderSequenceLock.lock(); + + try { + store.incrNextSenderMsgSeqNum(); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + senderSequenceLock.unlock(); + } + } + + public void incrementTargetSequence() { + targetSequenceLock.lock(); + + try { + store.incrNextTargetMsgSeqNum(); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + targetSequenceLock.unlock(); + } + } + + public int getCacheSenderSequence() { + try { + return store.getNextSenderMsgSeqNum(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public int getCacheTargetSequence() { + try { + return store.getNextTargetMsgSeqNum(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public int getDbSenderSequence() { + return database.senderSequence; + } + + public int getDbTargetSequence() { + return database.targetSequence; + } + } + + private static final class Database { + + private final Lock lock; + private int senderSequence; + private int targetSequence; + + public Database() { + this.lock = new ReentrantLock(); + this.senderSequence = -1; + this.targetSequence = -1; + } + + public void update(int senderSequence, int targetSequence) { + lock.lock(); + + try { + this.senderSequence = senderSequence; + this.targetSequence = targetSequence; + } finally { + lock.unlock(); + } + } + } + + private static final class UpdateSequenceStatement implements PreparedStatement { + + private final Database database; + private int senderSequence; + private int targetSequence; + + public UpdateSequenceStatement(Database database) { + this.database = database; + this.senderSequence = -1; + this.targetSequence = -1; + } + + @Override + public ResultSet executeQuery() { + throw new UnsupportedOperationException(); + } + + @Override + public int executeUpdate() { + throw new UnsupportedOperationException(); + } + + @Override + public void setNull(int parameterIndex, int sqlType) { + throw new UnsupportedOperationException(); + } + + @Override + public void setBoolean(int parameterIndex, boolean x) { + throw new UnsupportedOperationException(); + } + + @Override + public void setByte(int parameterIndex, byte x) { + throw new UnsupportedOperationException(); + } + + @Override + public void setShort(int parameterIndex, short x) { + throw new UnsupportedOperationException(); + } + + @Override + public void setInt(int parameterIndex, int x) { + if (parameterIndex == 1) { + targetSequence = x; + } else if (parameterIndex == 2) { + senderSequence = x; + } + } + + @Override + public void setLong(int parameterIndex, long x) { + throw new UnsupportedOperationException(); + } + + @Override + public void setFloat(int parameterIndex, float x) { + throw new UnsupportedOperationException(); + } + + @Override + public void setDouble(int parameterIndex, double x) { + throw new UnsupportedOperationException(); + } + + @Override + public void setBigDecimal(int parameterIndex, BigDecimal x) { + throw new UnsupportedOperationException(); + } + + @Override + public void setString(int parameterIndex, String x) { + } + + @Override + public void setBytes(int parameterIndex, byte[] x) { + throw new UnsupportedOperationException(); + } + + @Override + public void setDate(int parameterIndex, Date x) { + throw new UnsupportedOperationException(); + } + + @Override + public void setTime(int parameterIndex, Time x) { + throw new UnsupportedOperationException(); + } + + @Override + public void setTimestamp(int parameterIndex, Timestamp x) { + throw new UnsupportedOperationException(); + } + + @Override + public void setAsciiStream(int parameterIndex, InputStream x, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public void setUnicodeStream(int parameterIndex, InputStream x, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public void setBinaryStream(int parameterIndex, InputStream x, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public void clearParameters() { + throw new UnsupportedOperationException(); + } + + @Override + public void setObject(int parameterIndex, Object x, int targetSqlType) { + throw new UnsupportedOperationException(); + } + + @Override + public void setObject(int parameterIndex, Object x) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean execute() { + database.update(senderSequence, targetSequence); + return true; + } + + @Override + public void addBatch() { + throw new UnsupportedOperationException(); + } + + @Override + public void setCharacterStream(int parameterIndex, Reader reader, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public void setRef(int parameterIndex, Ref x) { + throw new UnsupportedOperationException(); + } + + @Override + public void setBlob(int parameterIndex, Blob x) { + throw new UnsupportedOperationException(); + } + + @Override + public void setClob(int parameterIndex, Clob x) { + throw new UnsupportedOperationException(); + } + + @Override + public void setArray(int parameterIndex, Array x) { + throw new UnsupportedOperationException(); + } + + @Override + public ResultSetMetaData getMetaData() { + throw new UnsupportedOperationException(); + } + + @Override + public void setDate(int parameterIndex, Date x, Calendar cal) { + throw new UnsupportedOperationException(); + } + + @Override + public void setTime(int parameterIndex, Time x, Calendar cal) { + throw new UnsupportedOperationException(); + } + + @Override + public void setTimestamp(int parameterIndex, Timestamp x, Calendar cal) { + throw new UnsupportedOperationException(); + } + + @Override + public void setNull(int parameterIndex, int sqlType, String typeName) { + throw new UnsupportedOperationException(); + } + + @Override + public void setURL(int parameterIndex, URL x) { + throw new UnsupportedOperationException(); + } + + @Override + public ParameterMetaData getParameterMetaData() { + throw new UnsupportedOperationException(); + } + + @Override + public void setRowId(int parameterIndex, RowId x) { + throw new UnsupportedOperationException(); + } + + @Override + public void setNString(int parameterIndex, String value) { + throw new UnsupportedOperationException(); + } + + @Override + public void setNCharacterStream(int parameterIndex, Reader value, long length) { + throw new UnsupportedOperationException(); + } + + @Override + public void setNClob(int parameterIndex, NClob value) { + throw new UnsupportedOperationException(); + } + + @Override + public void setClob(int parameterIndex, Reader reader, long length) { + throw new UnsupportedOperationException(); + } + + @Override + public void setBlob(int parameterIndex, InputStream inputStream, long length) { + throw new UnsupportedOperationException(); + } + + @Override + public void setNClob(int parameterIndex, Reader reader, long length) { + throw new UnsupportedOperationException(); + } + + @Override + public void setSQLXML(int parameterIndex, SQLXML xmlObject) { + throw new UnsupportedOperationException(); + } + + @Override + public void setObject(int parameterIndex, Object x, int targetSqlType, int scaleOrLength) { + throw new UnsupportedOperationException(); + } + + @Override + public void setAsciiStream(int parameterIndex, InputStream x, long length) { + throw new UnsupportedOperationException(); + } + + @Override + public void setBinaryStream(int parameterIndex, InputStream x, long length) { + throw new UnsupportedOperationException(); + } + + @Override + public void setCharacterStream(int parameterIndex, Reader reader, long length) { + throw new UnsupportedOperationException(); + } + + @Override + public void setAsciiStream(int parameterIndex, InputStream x) { + throw new UnsupportedOperationException(); + } + + @Override + public void setBinaryStream(int parameterIndex, InputStream x) { + throw new UnsupportedOperationException(); + } + + @Override + public void setCharacterStream(int parameterIndex, Reader reader) { + throw new UnsupportedOperationException(); + } + + @Override + public void setNCharacterStream(int parameterIndex, Reader value) { + throw new UnsupportedOperationException(); + } + + @Override + public void setClob(int parameterIndex, Reader reader) { + throw new UnsupportedOperationException(); + } + + @Override + public void setBlob(int parameterIndex, InputStream inputStream) { + throw new UnsupportedOperationException(); + } + + @Override + public void setNClob(int parameterIndex, Reader reader) { + throw new UnsupportedOperationException(); + } + + @Override + public ResultSet executeQuery(String sql) { + throw new UnsupportedOperationException(); + } + + @Override + public int executeUpdate(String sql) { + throw new UnsupportedOperationException(); + } + + @Override + public void close() { + } + + @Override + public int getMaxFieldSize() { + throw new UnsupportedOperationException(); + } + + @Override + public void setMaxFieldSize(int max) { + throw new UnsupportedOperationException(); + } + + @Override + public int getMaxRows() { + throw new UnsupportedOperationException(); + } + + @Override + public void setMaxRows(int max) { + throw new UnsupportedOperationException(); + } + + @Override + public void setEscapeProcessing(boolean enable) { + throw new UnsupportedOperationException(); + } + + @Override + public int getQueryTimeout() { + throw new UnsupportedOperationException(); + } + + @Override + public void setQueryTimeout(int seconds) { + throw new UnsupportedOperationException(); + } + + @Override + public void cancel() { + throw new UnsupportedOperationException(); + } + + @Override + public SQLWarning getWarnings() { + throw new UnsupportedOperationException(); + } + + @Override + public void clearWarnings() { + throw new UnsupportedOperationException(); + } + + @Override + public void setCursorName(String name) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean execute(String sql) { + throw new UnsupportedOperationException(); + } + + @Override + public ResultSet getResultSet() { + throw new UnsupportedOperationException(); + } + + @Override + public int getUpdateCount() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean getMoreResults() { + throw new UnsupportedOperationException(); + } + + @Override + public T unwrap(Class iface) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isWrapperFor(Class iface) { + throw new UnsupportedOperationException(); + } + + @Override + public void setFetchDirection(int direction) { + throw new UnsupportedOperationException(); + } + + @Override + public int getFetchDirection() { + throw new UnsupportedOperationException(); + } + + @Override + public void setFetchSize(int rows) { + throw new UnsupportedOperationException(); + } + + @Override + public int getFetchSize() { + throw new UnsupportedOperationException(); + } + + @Override + public int getResultSetConcurrency() { + throw new UnsupportedOperationException(); + } + + @Override + public int getResultSetType() { + throw new UnsupportedOperationException(); + } + + @Override + public void addBatch(String sql) { + throw new UnsupportedOperationException(); + } + + @Override + public void clearBatch() { + throw new UnsupportedOperationException(); + } + + @Override + public int[] executeBatch() { + throw new UnsupportedOperationException(); + } + + @Override + public Connection getConnection() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean getMoreResults(int current) { + throw new UnsupportedOperationException(); + } + + @Override + public ResultSet getGeneratedKeys() { + throw new UnsupportedOperationException(); + } + + @Override + public int executeUpdate(String sql, int autoGeneratedKeys) { + throw new UnsupportedOperationException(); + } + + @Override + public int executeUpdate(String sql, int[] columnIndexes) { + throw new UnsupportedOperationException(); + } + + @Override + public int executeUpdate(String sql, String[] columnNames) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean execute(String sql, int autoGeneratedKeys) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean execute(String sql, int[] columnIndexes) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean execute(String sql, String[] columnNames) { + throw new UnsupportedOperationException(); + } + + @Override + public int getResultSetHoldability() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isClosed() { + throw new UnsupportedOperationException(); + } + + @Override + public void setPoolable(boolean poolable) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isPoolable() { + throw new UnsupportedOperationException(); + } + + @Override + public void closeOnCompletion() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isCloseOnCompletion() { + throw new UnsupportedOperationException(); + } + } +} diff --git a/quickfixj-stress-test/src/main/java/quickfix/MemoryStoreStressTest.java b/quickfixj-stress-test/src/main/java/quickfix/MemoryStoreStressTest.java new file mode 100644 index 000000000..1ec31a053 --- /dev/null +++ b/quickfixj-stress-test/src/main/java/quickfix/MemoryStoreStressTest.java @@ -0,0 +1,144 @@ +package quickfix; + +import org.openjdk.jcstress.annotations.Actor; +import org.openjdk.jcstress.annotations.Arbiter; +import org.openjdk.jcstress.annotations.Expect; +import org.openjdk.jcstress.annotations.JCStressTest; +import org.openjdk.jcstress.annotations.Outcome; +import org.openjdk.jcstress.annotations.State; +import org.openjdk.jcstress.infra.results.JJ_Result; + +import java.io.IOException; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +@SuppressWarnings("unused") +public class MemoryStoreStressTest { + + @State + @JCStressTest + @Outcome(id = "2, 2", expect = Expect.ACCEPTABLE) + @Outcome(id = "1, 2", expect = Expect.FORBIDDEN, desc = "Source sequence update lost") + @Outcome(id = "2, 1", expect = Expect.FORBIDDEN, desc = "Target sequence update lost") + public static class SingleSenderSequenceTest { + + private final MemoryStoreWrapper underTest; + + public SingleSenderSequenceTest() { + this.underTest = new MemoryStoreWrapper(); + } + + // application thread + @Actor + public void incrementSender() { + underTest.incrementSenderSequence(); + } + + // QFJ Message Processor + @Actor + public void incrementTarget() { + underTest.incrementTargetSequence(); + } + + @Arbiter + public void captureResult(JJ_Result result) { + result.r1 = underTest.getSenderSequence(); + result.r2 = underTest.getTargetSequence(); + } + } + + @State + @JCStressTest + @Outcome(id = "3, 2", expect = Expect.ACCEPTABLE) + @Outcome(expect = Expect.FORBIDDEN) + public static class TwoSendersSequenceTest { + + private final MemoryStoreWrapper underTest; + + public TwoSendersSequenceTest() { + this.underTest = new MemoryStoreWrapper(); + } + + // application thread + @Actor + public void incrementSender1() { + underTest.incrementSenderSequence(); + } + + // application thread + @Actor + public void incrementSender2() { + underTest.incrementSenderSequence(); + } + + // QFJ Message Processor + @Actor + public void incrementTarget() { + underTest.incrementTargetSequence(); + } + + @Arbiter + public void captureResult(JJ_Result result) { + result.r1 = underTest.getSenderSequence(); + result.r2 = underTest.getTargetSequence(); + } + } + + private static final class MemoryStoreWrapper { + + private final MemoryStore store; + private final Lock senderSequenceLock; + private final Lock targetSequenceLock; + + public MemoryStoreWrapper() { + try { + this.store = new MemoryStore(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + this.senderSequenceLock = new ReentrantLock(); + this.targetSequenceLock = new ReentrantLock(); + } + + public void incrementSenderSequence() { + senderSequenceLock.lock(); + + try { + store.incrNextSenderMsgSeqNum(); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + senderSequenceLock.unlock(); + } + } + + public void incrementTargetSequence() { + targetSequenceLock.lock(); + + try { + store.incrNextTargetMsgSeqNum(); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + targetSequenceLock.unlock(); + } + } + + public int getSenderSequence() { + try { + return store.getNextSenderMsgSeqNum(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public int getTargetSequence() { + try { + return store.getNextTargetMsgSeqNum(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } +}