From 1257b8771dd7cb3e76f99b9a7b454058c42cd453 Mon Sep 17 00:00:00 2001 From: Marcin Date: Sat, 27 Sep 2025 13:13:10 +0200 Subject: [PATCH 1/2] JDBC store testing with new jcstress suite --- pom.xml | 11 +- quickfixj-core/pom.xml | 2 +- .../src/main/java/quickfix/JdbcStore.java | 115 ++- quickfixj-distribution/pom.xml | 15 +- quickfixj-stress-test/README.md | 83 ++ quickfixj-stress-test/pom.xml | 79 ++ .../quickfix/JdbcStoreHsqldbStressTest.java | 214 +++++ .../java/quickfix/JdbcStoreStressTest.java | 756 ++++++++++++++++++ .../main/java/quickfix/StressTestDbUtil.java | 77 ++ 9 files changed, 1308 insertions(+), 44 deletions(-) create mode 100644 quickfixj-stress-test/README.md create mode 100644 quickfixj-stress-test/pom.xml create mode 100644 quickfixj-stress-test/src/main/java/quickfix/JdbcStoreHsqldbStressTest.java create mode 100644 quickfixj-stress-test/src/main/java/quickfix/JdbcStoreStressTest.java create mode 100644 quickfixj-stress-test/src/main/java/quickfix/StressTestDbUtil.java diff --git a/pom.xml b/pom.xml index 582c3b5af..6022ae1a3 100644 --- a/pom.xml +++ b/pom.xml @@ -56,7 +56,8 @@ quickfixj-all quickfixj-distribution quickfixj-perf-test - + quickfixj-stress-test + UTF-8 @@ -69,7 +70,7 @@ 8 8 - 3.9.11 + 3.9.11 ${maven.version} ${maven.version} 3.3.1 @@ -102,7 +103,9 @@ OrchestraFIXLatest.xml 1.0.2 2.0.0 - 1.37 + 1.37 + 0.16 + 1.8.0.10 @@ -273,7 +276,7 @@ ${maven-compiler-plugin-version} true - true + true 2g 4g diff --git a/quickfixj-core/pom.xml b/quickfixj-core/pom.xml index 453a5403d..ae384fedf 100644 --- a/quickfixj-core/pom.xml +++ b/quickfixj-core/pom.xml @@ -102,7 +102,7 @@ hsqldb hsqldb - 1.8.0.10 + ${hsqldb.version} test diff --git a/quickfixj-core/src/main/java/quickfix/JdbcStore.java b/quickfixj-core/src/main/java/quickfix/JdbcStore.java index a1f901e4e..1d2eee8e9 100644 --- a/quickfixj-core/src/main/java/quickfix/JdbcStore.java +++ b/quickfixj-core/src/main/java/quickfix/JdbcStore.java @@ -54,19 +54,9 @@ class JdbcStore implements MessageStore { public JdbcStore(SessionSettings settings, SessionID sessionID, DataSource ds) throws Exception { this.sessionID = sessionID; - if (settings.isSetting(sessionID, SETTING_JDBC_STORE_SESSIONS_TABLE_NAME)) { - sessionTableName = settings - .getString(sessionID, SETTING_JDBC_STORE_SESSIONS_TABLE_NAME); - } else { - sessionTableName = DEFAULT_SESSION_TABLE_NAME; - } - if (settings.isSetting(sessionID, SETTING_JDBC_STORE_MESSAGES_TABLE_NAME)) { - messageTableName = settings - .getString(sessionID, SETTING_JDBC_STORE_MESSAGES_TABLE_NAME); - } else { - messageTableName = DEFAULT_MESSAGE_TABLE_NAME; - } + sessionTableName = getSessionTableName(settings, sessionID); + messageTableName = getMessageTableName(settings, sessionID); if (settings.isSetting(sessionID, SETTING_JDBC_SESSION_ID_DEFAULT_PROPERTY_VALUE)) { defaultSessionIdPropertyValue = settings.getString(sessionID, @@ -90,34 +80,67 @@ public JdbcStore(SessionSettings settings, SessionID sessionID, DataSource ds) t loadCache(); } - private void setSqlStrings() { - String idWhereClause = JdbcUtil.getIDWhereClause(extendedSessionIdSupported); - String idColumns = JdbcUtil.getIDColumns(extendedSessionIdSupported); - String idPlaceholders = JdbcUtil.getIDPlaceholders(extendedSessionIdSupported); + public static String getSessionTableName(SessionSettings settings, SessionID sessionID) throws ConfigError { + if (settings.isSetting(sessionID, SETTING_JDBC_STORE_SESSIONS_TABLE_NAME)) { + return settings.getString(sessionID, SETTING_JDBC_STORE_SESSIONS_TABLE_NAME); + } else { + return DEFAULT_SESSION_TABLE_NAME; + } + } + + public static String getMessageTableName(SessionSettings settings, SessionID sessionID) throws ConfigError { + if (settings.isSetting(sessionID, SETTING_JDBC_STORE_MESSAGES_TABLE_NAME)) { + return settings.getString(sessionID, SETTING_JDBC_STORE_MESSAGES_TABLE_NAME); + } else { + return DEFAULT_MESSAGE_TABLE_NAME; + } + } + + public static String getUpdateSequenceNumsSql(String sessionTableName, String idWhereClause) { + return "UPDATE " + sessionTableName + " SET incoming_seqnum=?, " + "outgoing_seqnum=? WHERE " + idWhereClause; + } + + public static String getInsertSessionSql(String sessionTableName, String idColumns, String idPlaceholders) { + return "INSERT INTO " + sessionTableName + " (" + idColumns + ", creation_time,incoming_seqnum, outgoing_seqnum) VALUES (" + idPlaceholders + ",?,?,?)"; + } - SQL_UPDATE_SEQNUMS = "UPDATE " + sessionTableName + " SET incoming_seqnum=?, " - + "outgoing_seqnum=? WHERE " + idWhereClause; + public static String getSequenceNumsSql(String sessionTableName, String idWhereClause) { + return "SELECT creation_time, incoming_seqnum, outgoing_seqnum FROM " + sessionTableName + " WHERE " + idWhereClause; + } - SQL_INSERT_SESSION = "INSERT INTO " + sessionTableName + " (" + idColumns - + ", creation_time,incoming_seqnum, outgoing_seqnum) VALUES (" + idPlaceholders - + ",?,?,?)"; + public static String getUpdateMessageSql(String messageTableName, String idWhereClause) { + return "UPDATE " + messageTableName + " SET message=? " + "WHERE " + idWhereClause + " and msgseqnum=?"; + } - SQL_GET_SEQNUMS = "SELECT creation_time, incoming_seqnum, outgoing_seqnum FROM " - + sessionTableName + " WHERE " + idWhereClause; + public static String getInsertMessageSql(String messageTableName, String idColumns, String idPlaceholders) { + return "INSERT INTO " + messageTableName + " (" + idColumns + ", msgseqnum,message) VALUES (" + idPlaceholders + ",?,?)"; + } - SQL_UPDATE_MESSAGE = "UPDATE " + messageTableName + " SET message=? " + "WHERE " - + idWhereClause + " and msgseqnum=?"; + public static String getMessagesSql(String messageTableName, String idWhereClause) { + return "SELECT message FROM " + messageTableName + " WHERE " + idWhereClause + " and msgseqnum>=? and msgseqnum<=? " + "ORDER BY msgseqnum"; + } - SQL_INSERT_MESSAGE = "INSERT INTO " + messageTableName + " (" + idColumns - + ", msgseqnum,message) VALUES (" + idPlaceholders + ",?,?)"; + public static String getUpdateSessionSql(String sessionTableName, String idWhereClause) { + return "UPDATE " + sessionTableName + " SET creation_time=?, " + "incoming_seqnum=?, outgoing_seqnum=? " + "WHERE " + idWhereClause; + } - SQL_GET_MESSAGES = "SELECT message FROM " + messageTableName + " WHERE " + idWhereClause - + " and msgseqnum>=? and msgseqnum<=? " + "ORDER BY msgseqnum"; + public static String getDeleteMessagesSql(String messageTableName, String idWhereClause) { + return "DELETE FROM " + messageTableName + " WHERE " + idWhereClause; + } - SQL_UPDATE_SESSION = "UPDATE " + sessionTableName + " SET creation_time=?, " - + "incoming_seqnum=?, outgoing_seqnum=? " + "WHERE " + idWhereClause; + private void setSqlStrings() { + String idWhereClause = JdbcUtil.getIDWhereClause(extendedSessionIdSupported); + String idColumns = JdbcUtil.getIDColumns(extendedSessionIdSupported); + String idPlaceholders = JdbcUtil.getIDPlaceholders(extendedSessionIdSupported); - SQL_DELETE_MESSAGES = "DELETE FROM " + messageTableName + " WHERE " + idWhereClause; + SQL_UPDATE_SEQNUMS = getUpdateSequenceNumsSql(sessionTableName, idWhereClause); + SQL_INSERT_SESSION = getInsertSessionSql(sessionTableName, idColumns, idPlaceholders); + SQL_GET_SEQNUMS = getSequenceNumsSql(sessionTableName, idWhereClause); + SQL_UPDATE_MESSAGE = getUpdateMessageSql(messageTableName, idWhereClause); + SQL_INSERT_MESSAGE = getInsertMessageSql(messageTableName, idColumns, idPlaceholders); + SQL_GET_MESSAGES = getMessagesSql(messageTableName, idWhereClause); + SQL_UPDATE_SESSION = getUpdateSessionSql(sessionTableName, idWhereClause); + SQL_DELETE_MESSAGES = getDeleteMessagesSql(messageTableName, idWhereClause); } private void loadCache() throws SQLException, IOException { @@ -171,6 +194,32 @@ public int getNextTargetMsgSeqNum() throws IOException { return cache.getNextTargetMsgSeqNum(); } + int getNextSenderMsgSeqNumFromDb() throws SQLException { + return getNextMsgSeqNumsFromDb()[0]; + } + + int getNextTargetMsgSeqNumFromDb() throws SQLException { + return getNextMsgSeqNumsFromDb()[1]; + } + + int[] getNextMsgSeqNumsFromDb() throws SQLException { + try (Connection connection = dataSource.getConnection()) { + try (PreparedStatement query = connection.prepareStatement(SQL_GET_SEQNUMS)) { + setSessionIdParameters(query, 1); + + try (ResultSet result = query.executeQuery()) { + if (result.next()) { + int targetSeqNum = result.getInt(2); + int senderSeqNum = result.getInt(3); + return new int[] {senderSeqNum, targetSeqNum}; + } else { + return new int[] {-1, -1}; + } + } + } + } + } + public void incrNextSenderMsgSeqNum() throws IOException { cache.incrNextSenderMsgSeqNum(); setNextSenderMsgSeqNum(cache.getNextSenderMsgSeqNum()); @@ -238,7 +287,6 @@ public void get(int startSequence, int endSequence, Collection messages) public boolean set(int sequence, String message) throws IOException { Connection connection = null; PreparedStatement insert = null; - ResultSet rs = null; try { connection = dataSource.getConnection(); insert = connection.prepareStatement(SQL_INSERT_MESSAGE); @@ -263,7 +311,6 @@ public boolean set(int sequence, String message) throws IOException { } } } finally { - JdbcUtil.close(sessionID, rs); JdbcUtil.close(sessionID, insert); JdbcUtil.close(sessionID, connection); } diff --git a/quickfixj-distribution/pom.xml b/quickfixj-distribution/pom.xml index 5d6854d74..b5f045c06 100644 --- a/quickfixj-distribution/pom.xml +++ b/quickfixj-distribution/pom.xml @@ -122,11 +122,16 @@ quickfixj-messages-fix40 ${project.version} - - org.quickfixj - quickfixj-perf-test - ${project.version} - + + org.quickfixj + quickfixj-perf-test + ${project.version} + + + org.quickfixj + quickfixj-stress-test + ${project.version} + com.sleepycat je diff --git a/quickfixj-stress-test/README.md b/quickfixj-stress-test/README.md new file mode 100644 index 000000000..42db2f780 --- /dev/null +++ b/quickfixj-stress-test/README.md @@ -0,0 +1,83 @@ +# QuickFIX/J Java Concurrency Stress Test + +This is a [Java Concurrency Stress (jcstress)](https://github.com/openjdk/jcstress) concurrency stress testing module for QuickFIX/J FIX protocol implementation. + +## How to Run + +Concurrency stress testing classes can be individually run using your favorite IDE or command line. + +### Building the Executable JAR + +#### Build Full Project + +To build the entire project including all modules: + +```bash +$ mvn clean package +``` + +#### Build Only `quickfixj-stress-test` Module + +Build the stress test module with required dependencies, skipping test execution: + +```bash +$ mvn clean package -pl quickfixj-stress-test -am -PskipAT,skipBundlePlugin -DskipTests +``` + +**Command Options Explained:** +- `-pl quickfixj-stress-test` - Build only the stress test module +- `-am` - Also build required dependency modules +- `-PskipAT,skipBundlePlugin` - Skip acceptance tests and bundle plugin +- `-DskipTests` - Skip running unit test cases during build + +### Running Tests + +#### Run Complete Test Suite + +Execute all concurrency regression test cases: + +```bash +$ java -jar quickfixj-stress-test/target/quickfixj-stress-test.jar +``` + +#### Run a Single Test + +Run a specific test class or test group (if test cases are nested): + +```bash +$ java -jar quickfixj-stress-test/target/quickfixj-stress-test.jar -t JdbcStoreStressTest +``` + +#### View Available Options + +Display all available command-line options: + +```bash +$ java -jar quickfixj-stress-test/target/quickfixj-stress-test.jar -h +``` + +## Common Options + +Some useful jcstress options include: + +- `-t ` - Run specific test or test group +- `-v` - Verbose mode +- `-time ` - Duration for each test iteration +- `-iters ` - Number of iterations +- `-jvmArgs ` - Additional JVM arguments + +## Additional Notes + +- Test results are generated in the `results/` directory by default +- For CI/CD integration, use the full path to the JAR file +- Stress tests may take significant time to complete depending on your system resources +- Review jcstress documentation for advanced configuration options + +## Troubleshooting + +If you encounter issues: + +1. Ensure all prerequisites are met +2. Verify the JAR file was built successfully in `quickfixj-stress-test/target/` +3. Check that you have sufficient system resources (CPU, memory) +4. Review the jcstress output for specific error messages \ No newline at end of file diff --git a/quickfixj-stress-test/pom.xml b/quickfixj-stress-test/pom.xml new file mode 100644 index 000000000..7d8e05e57 --- /dev/null +++ b/quickfixj-stress-test/pom.xml @@ -0,0 +1,79 @@ + + + 4.0.0 + + + org.quickfixj + quickfixj-parent + 3.0.0-SNAPSHOT + + + quickfixj-stress-test + jar + + QuickFIX/J Stress test + QuickFIX/J Stress test + http://www.quickfixj.org + + + + org.quickfixj + quickfixj-core + ${project.version} + + + org.openjdk.jcstress + jcstress-core + ${jcstress.version} + + + hsqldb + hsqldb + ${hsqldb.version} + + + org.slf4j + slf4j-jdk14 + ${slf4j.version} + + + org.mockito + mockito-core + compile + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + create-jcstress-jar + package + + shade + + + ${project.artifactId} + + + org.openjdk.jcstress.Main + + + META-INF/TestList + + + false + true + standalone + + + + + + + \ No newline at end of file diff --git a/quickfixj-stress-test/src/main/java/quickfix/JdbcStoreHsqldbStressTest.java b/quickfixj-stress-test/src/main/java/quickfix/JdbcStoreHsqldbStressTest.java new file mode 100644 index 000000000..04eec08db --- /dev/null +++ b/quickfixj-stress-test/src/main/java/quickfix/JdbcStoreHsqldbStressTest.java @@ -0,0 +1,214 @@ +package quickfix; + +import org.apache.mina.util.AvailablePortFinder; +import org.hsqldb.Server; +import org.openjdk.jcstress.annotations.Actor; +import org.openjdk.jcstress.annotations.Arbiter; +import org.openjdk.jcstress.annotations.Expect; +import org.openjdk.jcstress.annotations.JCStressTest; +import org.openjdk.jcstress.annotations.Outcome; +import org.openjdk.jcstress.annotations.State; +import org.openjdk.jcstress.infra.results.JJJJ_Result; + +import javax.sql.DataSource; +import java.io.IOException; +import java.sql.SQLException; +import java.sql.Wrapper; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +@SuppressWarnings("unused") +public class JdbcStoreHsqldbStressTest { + + private static final SessionID SESSION_ID = new SessionID(FixVersions.BEGINSTRING_FIX44, "JDBC_INITIATOR", "JDBC_ACCEPTOR"); + + private static JdbcStoreWrapper createWrapper() throws Exception { + int dbPort = AvailablePortFinder.getNextAvailable(); + + Server dbServer = new Server(); + dbServer.setDatabaseName(0, "quickfix-jdbc-test"); + dbServer.setDatabasePath(0, "mem:quickfix-jdbc-test"); + dbServer.setAddress("127.0.0.1"); + dbServer.setPort(dbPort); + dbServer.start(); + + String connectionUrl = "jdbc:hsqldb:hsql://127.0.0.1:" + dbPort + "/quickfix-jdbc-test"; + + SessionSettings settings = new SessionSettings(); + settings.setString(SESSION_ID, JdbcSetting.SETTING_JDBC_DRIVER, "org.hsqldb.jdbcDriver"); + settings.setString(SESSION_ID, JdbcSetting.SETTING_JDBC_CONNECTION_URL, connectionUrl); + settings.setString(SESSION_ID, JdbcSetting.SETTING_JDBC_USER, "SA"); + settings.setString(SESSION_ID, JdbcSetting.SETTING_JDBC_PASSWORD, ""); + settings.setString(SESSION_ID, JdbcSetting.SETTING_JDBC_CONNECTION_TEST_QUERY, "CALL NOW()"); + + DataSource dataSource = JdbcUtil.getDataSource(settings, SESSION_ID); + StressTestDbUtil.initTables(dataSource); + + JdbcStore store = new JdbcStore(settings, SESSION_ID, null); + + return new JdbcStoreWrapper(store, dbServer); + } + + @State + @JCStressTest + @Outcome(id = "2, 2, 2, 2", expect = Expect.ACCEPTABLE) + public static class SingleSenderSequenceTest { + + private final JdbcStoreWrapper underTest; + + public SingleSenderSequenceTest() { + try { + this.underTest = createWrapper(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + // application thread + @SuppressWarnings("unused") + @Actor + public void incrementSender() { + underTest.incrementSenderSeqNum(); + } + + // QFJ Message Processor + @SuppressWarnings("unused") + @Actor + public void incrementTarget() { + underTest.incrementTargetSeqNum(); + } + + @Arbiter + public void captureResult(JJJJ_Result result) { + result.r1 = underTest.getCacheSenderSequence(); + result.r2 = underTest.getCacheTargetSequence(); + + int[] seqs = underTest.getNextMsgSeqNumsFromDb(); + + result.r3 = seqs[0]; + result.r4 = seqs[1]; + + underTest.close(); + } + } + + @State + @JCStressTest + @Outcome(id = "3, 2, 3, 2", expect = Expect.ACCEPTABLE) + public static class TwoSendersSequenceTest { + + private final JdbcStoreWrapper underTest; + + public TwoSendersSequenceTest() { + try { + this.underTest = createWrapper(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + // application thread + @SuppressWarnings("unused") + @Actor + public void incrementSender1() { + underTest.incrementSenderSeqNum(); + } + + // application thread + @SuppressWarnings("unused") + @Actor + public void incrementSender2() { + underTest.incrementSenderSeqNum(); + } + + // QFJ Message Processor + @SuppressWarnings("unused") + @Actor + public void incrementTarget() { + underTest.incrementTargetSeqNum(); + } + + @Arbiter + public void captureResult(JJJJ_Result result) { + result.r1 = underTest.getCacheSenderSequence(); + result.r2 = underTest.getCacheTargetSequence(); + + int[] seqs = underTest.getNextMsgSeqNumsFromDb(); + + result.r3 = seqs[0]; + result.r4 = seqs[1]; + + underTest.close(); + } + } + + private static final class JdbcStoreWrapper { + + private final JdbcStore messageStore; + private final Server dbServer; + private final Lock senderSequenceLock; + private final Lock targetSequenceLock; + + public JdbcStoreWrapper(JdbcStore messageStore, Server dbServer) { + this.messageStore = messageStore; + this.dbServer = dbServer; + this.senderSequenceLock = new ReentrantLock(); + this.targetSequenceLock = new ReentrantLock(); + } + + public void incrementSenderSeqNum() { + senderSequenceLock.lock(); + + try { + messageStore.incrNextSenderMsgSeqNum(); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + senderSequenceLock.unlock(); + } + } + + public void incrementTargetSeqNum() { + targetSequenceLock.lock(); + + try { + messageStore.incrNextTargetMsgSeqNum(); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + targetSequenceLock.unlock(); + } + } + + public int getCacheSenderSequence() { + try { + return messageStore.getNextSenderMsgSeqNum(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public int getCacheTargetSequence() { + try { + return messageStore.getNextTargetMsgSeqNum(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public int[] getNextMsgSeqNumsFromDb() { + try { + return messageStore.getNextMsgSeqNumsFromDb(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + public void close() { + try { + dbServer.stop(); + } catch (RuntimeException ignored) { + } + } + } +} diff --git a/quickfixj-stress-test/src/main/java/quickfix/JdbcStoreStressTest.java b/quickfixj-stress-test/src/main/java/quickfix/JdbcStoreStressTest.java new file mode 100644 index 000000000..dcb2dcb11 --- /dev/null +++ b/quickfixj-stress-test/src/main/java/quickfix/JdbcStoreStressTest.java @@ -0,0 +1,756 @@ +package quickfix; + +import org.openjdk.jcstress.annotations.Actor; +import org.openjdk.jcstress.annotations.Arbiter; +import org.openjdk.jcstress.annotations.Expect; +import org.openjdk.jcstress.annotations.JCStressTest; +import org.openjdk.jcstress.annotations.Outcome; +import org.openjdk.jcstress.annotations.State; +import org.openjdk.jcstress.infra.results.JJJJ_Result; + +import javax.sql.DataSource; +import java.io.IOException; +import java.io.InputStream; +import java.io.Reader; +import java.math.BigDecimal; +import java.net.URL; +import java.sql.Array; +import java.sql.Blob; +import java.sql.Clob; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.Date; +import java.sql.NClob; +import java.sql.ParameterMetaData; +import java.sql.PreparedStatement; +import java.sql.Ref; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.RowId; +import java.sql.SQLWarning; +import java.sql.SQLXML; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.Calendar; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +@SuppressWarnings("unused") +public class JdbcStoreStressTest { + + private static final SessionID SESSION_ID = new SessionID(FixVersions.BEGINSTRING_FIX44, "JDBC_INITIATOR", "JDBC_ACCEPTOR"); + + private static JdbcStoreWrapper createWrapper() throws Exception { + ResultSet columnsResultSet = mock(ResultSet.class); + doReturn(true).when(columnsResultSet).next(); + + SessionSettings settings = new SessionSettings(); + String sessionTableName = JdbcStore.getSessionTableName(settings, SESSION_ID); + String messageTableName = JdbcStore.getMessageTableName(settings, SESSION_ID); + + DatabaseMetaData metaData = mock(DatabaseMetaData.class); + doReturn(columnsResultSet).when(metaData).getColumns(null, null, sessionTableName.toUpperCase(), "SENDERSUBID"); + + Connection connection = mock(Connection.class); + doReturn(metaData).when(connection).getMetaData(); + + DataSource dataSource = mock(DataSource.class); + doReturn(connection).when(dataSource).getConnection(); + + boolean extendedSessionIdSupport = JdbcUtil.determineSessionIdSupport(dataSource, sessionTableName); + String idColumns = JdbcUtil.getIDColumns(extendedSessionIdSupport); + String idPlaceholders = JdbcUtil.getIDPlaceholders(extendedSessionIdSupport); + String idWhereClause = JdbcUtil.getIDWhereClause(extendedSessionIdSupport); + + // GET SEQUENCE NUMS + ResultSet getSequenceResult = mock(ResultSet.class); + doReturn(false).when(getSequenceResult).next(); + + PreparedStatement getSequenceNumsQuery = mock(PreparedStatement.class); + doReturn(getSequenceResult).when(getSequenceNumsQuery).executeQuery(); + + String sequenceNumsSql = JdbcStore.getSequenceNumsSql(sessionTableName, idWhereClause); + doReturn(getSequenceNumsQuery).when(connection).prepareStatement(sequenceNumsSql); + + // INSERT SESSION + PreparedStatement insertSessionQuery = mock(PreparedStatement.class); + + String insertSessionSql = JdbcStore.getInsertSessionSql(sessionTableName, idColumns, idPlaceholders); + doReturn(insertSessionQuery).when(connection).prepareStatement(insertSessionSql); + + // UPDATE SEQUENCE NUMS + UpdateSequenceStatement updateSequenceNumsQuery = new UpdateSequenceStatement(); + + String updateSequenceNumsSql = JdbcStore.getUpdateSequenceNumsSql(sessionTableName, idWhereClause); + doReturn(updateSequenceNumsQuery).when(connection).prepareStatement(updateSequenceNumsSql); + + JdbcStore jdbcStore = new JdbcStore(settings, SESSION_ID, dataSource); + + if (jdbcStore.getNextSenderMsgSeqNum() != 1) { + throw new IllegalStateException("Invalid next sender sequence: " + jdbcStore.getNextSenderMsgSeqNum()); + } + + if (jdbcStore.getNextTargetMsgSeqNum() != 1) { + throw new IllegalStateException("Invalid next target sequence: " + jdbcStore.getNextTargetMsgSeqNum()); + } + + return new JdbcStoreWrapper(jdbcStore, updateSequenceNumsQuery); + } + + @State + @JCStressTest + @Outcome(id = "2, 2, 2, 2", expect = Expect.ACCEPTABLE) + public static class SingleSenderSequenceTest { + + private final JdbcStoreWrapper underTest; + + public SingleSenderSequenceTest() { + try { + this.underTest = createWrapper(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + // application thread + @SuppressWarnings("unused") + @Actor + public void incrementSender() { + underTest.incrementSenderSeqNum(); + } + + // QFJ Message Processor + @SuppressWarnings("unused") + @Actor + public void incrementTarget() { + underTest.incrementTargetSeqNum(); + } + + @Arbiter + public void captureResult(JJJJ_Result result) { + result.r1 = underTest.getCacheSenderSequence(); + result.r2 = underTest.getCacheTargetSequence(); + result.r3 = underTest.getDbSenderSequence(); + result.r4 = underTest.getDbTargetSequence(); + } + } + + @State + @JCStressTest + @Outcome(id = "3, 2, 3, 2", expect = Expect.ACCEPTABLE) + public static class TwoSendersSequenceTest { + + private final JdbcStoreWrapper underTest; + + public TwoSendersSequenceTest() { + try { + this.underTest = createWrapper(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + // application thread + @SuppressWarnings("unused") + @Actor + public void incrementSender1() { + underTest.incrementSenderSeqNum(); + } + + // application thread + @SuppressWarnings("unused") + @Actor + public void incrementSender2() { + underTest.incrementSenderSeqNum(); + } + + // QFJ Message Processor + @SuppressWarnings("unused") + @Actor + public void incrementTarget() { + underTest.incrementTargetSeqNum(); + } + + @Arbiter + public void captureResult(JJJJ_Result result) { + result.r1 = underTest.getCacheSenderSequence(); + result.r2 = underTest.getCacheTargetSequence(); + result.r3 = underTest.getDbSenderSequence(); + result.r4 = underTest.getDbTargetSequence(); + } + } + + private static final class JdbcStoreWrapper { + + private final JdbcStore messageStore; + private final UpdateSequenceStatement updateSequenceNumsQuery; + private final Lock senderSequenceLock; + private final Lock targetSequenceLock; + + public JdbcStoreWrapper(JdbcStore messageStore, UpdateSequenceStatement updateSequenceNumsQuery) { + this.messageStore = messageStore; + this.updateSequenceNumsQuery = updateSequenceNumsQuery; + this.senderSequenceLock = new ReentrantLock(); + this.targetSequenceLock = new ReentrantLock(); + } + + public void incrementSenderSeqNum() { + senderSequenceLock.lock(); + + try { + messageStore.incrNextSenderMsgSeqNum(); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + senderSequenceLock.unlock(); + } + } + + public void incrementTargetSeqNum() { + targetSequenceLock.lock(); + + try { + messageStore.incrNextTargetMsgSeqNum(); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + targetSequenceLock.unlock(); + } + } + + public int getCacheSenderSequence() { + try { + return messageStore.getNextSenderMsgSeqNum(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public int getCacheTargetSequence() { + try { + return messageStore.getNextTargetMsgSeqNum(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public int getDbSenderSequence() { + return updateSequenceNumsQuery.senderSeqNum; + } + + public int getDbTargetSequence() { + return updateSequenceNumsQuery.targetSeqNum; + } + } + + private static final class UpdateSequenceStatement implements PreparedStatement { + + private int senderSeqNum; + private int targetSeqNum; + + public UpdateSequenceStatement() { + this.senderSeqNum = -1; + this.targetSeqNum = -1; + } + + @Override + public ResultSet executeQuery() { + throw new UnsupportedOperationException(); + } + + @Override + public int executeUpdate() { + throw new UnsupportedOperationException(); + } + + @Override + public void setNull(int parameterIndex, int sqlType) { + throw new UnsupportedOperationException(); + } + + @Override + public void setBoolean(int parameterIndex, boolean x) { + throw new UnsupportedOperationException(); + } + + @Override + public void setByte(int parameterIndex, byte x) { + throw new UnsupportedOperationException(); + } + + @Override + public void setShort(int parameterIndex, short x) { + throw new UnsupportedOperationException(); + } + + @Override + public void setInt(int parameterIndex, int x) { + if (parameterIndex == 1) { + targetSeqNum = x; + } else if (parameterIndex == 2) { + senderSeqNum = x; + } + } + + @Override + public void setLong(int parameterIndex, long x) { + throw new UnsupportedOperationException(); + } + + @Override + public void setFloat(int parameterIndex, float x) { + throw new UnsupportedOperationException(); + } + + @Override + public void setDouble(int parameterIndex, double x) { + throw new UnsupportedOperationException(); + } + + @Override + public void setBigDecimal(int parameterIndex, BigDecimal x) { + throw new UnsupportedOperationException(); + } + + @Override + public void setString(int parameterIndex, String x) { + } + + @Override + public void setBytes(int parameterIndex, byte[] x) { + throw new UnsupportedOperationException(); + } + + @Override + public void setDate(int parameterIndex, Date x) { + throw new UnsupportedOperationException(); + } + + @Override + public void setTime(int parameterIndex, Time x) { + throw new UnsupportedOperationException(); + } + + @Override + public void setTimestamp(int parameterIndex, Timestamp x) { + throw new UnsupportedOperationException(); + } + + @Override + public void setAsciiStream(int parameterIndex, InputStream x, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public void setUnicodeStream(int parameterIndex, InputStream x, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public void setBinaryStream(int parameterIndex, InputStream x, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public void clearParameters() { + throw new UnsupportedOperationException(); + } + + @Override + public void setObject(int parameterIndex, Object x, int targetSqlType) { + throw new UnsupportedOperationException(); + } + + @Override + public void setObject(int parameterIndex, Object x) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean execute() { + return true; + } + + @Override + public void addBatch() { + throw new UnsupportedOperationException(); + } + + @Override + public void setCharacterStream(int parameterIndex, Reader reader, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public void setRef(int parameterIndex, Ref x) { + throw new UnsupportedOperationException(); + } + + @Override + public void setBlob(int parameterIndex, Blob x) { + throw new UnsupportedOperationException(); + } + + @Override + public void setClob(int parameterIndex, Clob x) { + throw new UnsupportedOperationException(); + } + + @Override + public void setArray(int parameterIndex, Array x) { + throw new UnsupportedOperationException(); + } + + @Override + public ResultSetMetaData getMetaData() { + throw new UnsupportedOperationException(); + } + + @Override + public void setDate(int parameterIndex, Date x, Calendar cal) { + throw new UnsupportedOperationException(); + } + + @Override + public void setTime(int parameterIndex, Time x, Calendar cal) { + throw new UnsupportedOperationException(); + } + + @Override + public void setTimestamp(int parameterIndex, Timestamp x, Calendar cal) { + throw new UnsupportedOperationException(); + } + + @Override + public void setNull(int parameterIndex, int sqlType, String typeName) { + throw new UnsupportedOperationException(); + } + + @Override + public void setURL(int parameterIndex, URL x) { + throw new UnsupportedOperationException(); + } + + @Override + public ParameterMetaData getParameterMetaData() { + throw new UnsupportedOperationException(); + } + + @Override + public void setRowId(int parameterIndex, RowId x) { + throw new UnsupportedOperationException(); + } + + @Override + public void setNString(int parameterIndex, String value) { + throw new UnsupportedOperationException(); + } + + @Override + public void setNCharacterStream(int parameterIndex, Reader value, long length) { + throw new UnsupportedOperationException(); + } + + @Override + public void setNClob(int parameterIndex, NClob value) { + throw new UnsupportedOperationException(); + } + + @Override + public void setClob(int parameterIndex, Reader reader, long length) { + throw new UnsupportedOperationException(); + } + + @Override + public void setBlob(int parameterIndex, InputStream inputStream, long length) { + throw new UnsupportedOperationException(); + } + + @Override + public void setNClob(int parameterIndex, Reader reader, long length) { + throw new UnsupportedOperationException(); + } + + @Override + public void setSQLXML(int parameterIndex, SQLXML xmlObject) { + throw new UnsupportedOperationException(); + } + + @Override + public void setObject(int parameterIndex, Object x, int targetSqlType, int scaleOrLength) { + throw new UnsupportedOperationException(); + } + + @Override + public void setAsciiStream(int parameterIndex, InputStream x, long length) { + throw new UnsupportedOperationException(); + } + + @Override + public void setBinaryStream(int parameterIndex, InputStream x, long length) { + throw new UnsupportedOperationException(); + } + + @Override + public void setCharacterStream(int parameterIndex, Reader reader, long length) { + throw new UnsupportedOperationException(); + } + + @Override + public void setAsciiStream(int parameterIndex, InputStream x) { + throw new UnsupportedOperationException(); + } + + @Override + public void setBinaryStream(int parameterIndex, InputStream x) { + throw new UnsupportedOperationException(); + } + + @Override + public void setCharacterStream(int parameterIndex, Reader reader) { + throw new UnsupportedOperationException(); + } + + @Override + public void setNCharacterStream(int parameterIndex, Reader value) { + throw new UnsupportedOperationException(); + } + + @Override + public void setClob(int parameterIndex, Reader reader) { + throw new UnsupportedOperationException(); + } + + @Override + public void setBlob(int parameterIndex, InputStream inputStream) { + throw new UnsupportedOperationException(); + } + + @Override + public void setNClob(int parameterIndex, Reader reader) { + throw new UnsupportedOperationException(); + } + + @Override + public ResultSet executeQuery(String sql) { + throw new UnsupportedOperationException(); + } + + @Override + public int executeUpdate(String sql) { + throw new UnsupportedOperationException(); + } + + @Override + public void close() { + } + + @Override + public int getMaxFieldSize() { + throw new UnsupportedOperationException(); + } + + @Override + public void setMaxFieldSize(int max) { + throw new UnsupportedOperationException(); + } + + @Override + public int getMaxRows() { + throw new UnsupportedOperationException(); + } + + @Override + public void setMaxRows(int max) { + throw new UnsupportedOperationException(); + } + + @Override + public void setEscapeProcessing(boolean enable) { + throw new UnsupportedOperationException(); + } + + @Override + public int getQueryTimeout() { + throw new UnsupportedOperationException(); + } + + @Override + public void setQueryTimeout(int seconds) { + throw new UnsupportedOperationException(); + } + + @Override + public void cancel() { + throw new UnsupportedOperationException(); + } + + @Override + public SQLWarning getWarnings() { + throw new UnsupportedOperationException(); + } + + @Override + public void clearWarnings() { + throw new UnsupportedOperationException(); + } + + @Override + public void setCursorName(String name) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean execute(String sql) { + throw new UnsupportedOperationException(); + } + + @Override + public ResultSet getResultSet() { + throw new UnsupportedOperationException(); + } + + @Override + public int getUpdateCount() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean getMoreResults() { + throw new UnsupportedOperationException(); + } + + @Override + public T unwrap(Class iface) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isWrapperFor(Class iface) { + throw new UnsupportedOperationException(); + } + + @Override + public void setFetchDirection(int direction) { + throw new UnsupportedOperationException(); + } + + @Override + public int getFetchDirection() { + throw new UnsupportedOperationException(); + } + + @Override + public void setFetchSize(int rows) { + throw new UnsupportedOperationException(); + } + + @Override + public int getFetchSize() { + throw new UnsupportedOperationException(); + } + + @Override + public int getResultSetConcurrency() { + throw new UnsupportedOperationException(); + } + + @Override + public int getResultSetType() { + throw new UnsupportedOperationException(); + } + + @Override + public void addBatch(String sql) { + throw new UnsupportedOperationException(); + } + + @Override + public void clearBatch() { + throw new UnsupportedOperationException(); + } + + @Override + public int[] executeBatch() { + throw new UnsupportedOperationException(); + } + + @Override + public Connection getConnection() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean getMoreResults(int current) { + throw new UnsupportedOperationException(); + } + + @Override + public ResultSet getGeneratedKeys() { + throw new UnsupportedOperationException(); + } + + @Override + public int executeUpdate(String sql, int autoGeneratedKeys) { + throw new UnsupportedOperationException(); + } + + @Override + public int executeUpdate(String sql, int[] columnIndexes) { + throw new UnsupportedOperationException(); + } + + @Override + public int executeUpdate(String sql, String[] columnNames) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean execute(String sql, int autoGeneratedKeys) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean execute(String sql, int[] columnIndexes) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean execute(String sql, String[] columnNames) { + throw new UnsupportedOperationException(); + } + + @Override + public int getResultSetHoldability() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isClosed() { + throw new UnsupportedOperationException(); + } + + @Override + public void setPoolable(boolean poolable) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isPoolable() { + throw new UnsupportedOperationException(); + } + + @Override + public void closeOnCompletion() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isCloseOnCompletion() { + throw new UnsupportedOperationException(); + } + } +} diff --git a/quickfixj-stress-test/src/main/java/quickfix/StressTestDbUtil.java b/quickfixj-stress-test/src/main/java/quickfix/StressTestDbUtil.java new file mode 100644 index 000000000..7d2eec066 --- /dev/null +++ b/quickfixj-stress-test/src/main/java/quickfix/StressTestDbUtil.java @@ -0,0 +1,77 @@ +package quickfix; + +import javax.sql.DataSource; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.function.Function; + +import static java.util.Objects.requireNonNull; + +public final class StressTestDbUtil { + + private static final int BUFFER_LENGTH = 4096; + + public static void initTables(DataSource dataSource) throws SQLException, IOException { + try (Connection connection = dataSource.getConnection()) { + executeSql(connection, "config/sql/hsqldb/sessions_table.sql", new HypersonicPreprocessor("sessions")::preprocessSQL); + executeSql(connection, "config/sql/hsqldb/messages_table.sql", new HypersonicPreprocessor("messages")::preprocessSQL); + } + } + + private static void executeSql(Connection connection, String sqlResource, Function preprocessor) throws IOException, SQLException { + try (InputStream in = JdbcStoreHsqldbStressTest.class.getClassLoader().getResourceAsStream(sqlResource)) { + String sql = readFully(requireNonNull(in)); + sql = preprocessor.apply(sql); + + try (Statement statement = connection.createStatement()) { + statement.execute(sql); + } + } + } + + private static String readFully(InputStream in) throws IOException { + StringBuilder out = new StringBuilder(in.available()); + char[] buffer = new char[BUFFER_LENGTH]; + + try (InputStreamReader reader = new InputStreamReader(in, StandardCharsets.UTF_8)) { + int readCharCount; + + while ((readCharCount = reader.read(buffer)) != -1) { + out.append(buffer, 0, readCharCount); + } + } + + return out.toString(); + } + + // this is copied from quickfixj-core test cope - quickfix.JdbcTestSupport.HypersonicPreprocessor + // TODO move to a common place and make visible for both modules + private static final class HypersonicPreprocessor { + + private final String tableName; + + public HypersonicPreprocessor(String tableName) { + this.tableName = tableName; + } + + public String preprocessSQL(String sql) { + sql = sql.replaceAll("USE .*;", ""); + sql = sql.replaceAll(" UNSIGNED", ""); + sql = sql.replaceAll("AUTO_INCREMENT", "IDENTITY"); + sql = sql.replaceAll("TEXT", "VARCHAR(256)"); + + if (tableName != null) { + sql = sql.replaceAll("CREATE TABLE [a-z]+", "CREATE TABLE " + tableName); + sql = sql.replaceAll("DROP TABLE [a-z]+", "DROP TABLE " + tableName); + sql = sql.replaceAll("DELETE FROM [a-z]+", "DELETE FROM " + tableName); + } + + return sql; + } + } +} From e8452ca00222398a118de4cc131cacc30c2bbc0d Mon Sep 17 00:00:00 2001 From: Marcin Date: Tue, 30 Sep 2025 12:21:00 +0200 Subject: [PATCH 2/2] JDBC store testing with new jcstress suite --- .../src/main/java/quickfix/JdbcStore.java | 26 --- quickfixj-stress-test/README.md | 11 +- quickfixj-stress-test/pom.xml | 5 - .../quickfix/JdbcStoreHsqldbStressTest.java | 214 ------------------ .../java/quickfix/JdbcStoreStressTest.java | 115 +++++++--- .../java/quickfix/MemoryStoreStressTest.java | 144 ++++++++++++ .../main/java/quickfix/StressTestDbUtil.java | 77 ------- 7 files changed, 230 insertions(+), 362 deletions(-) delete mode 100644 quickfixj-stress-test/src/main/java/quickfix/JdbcStoreHsqldbStressTest.java create mode 100644 quickfixj-stress-test/src/main/java/quickfix/MemoryStoreStressTest.java delete mode 100644 quickfixj-stress-test/src/main/java/quickfix/StressTestDbUtil.java diff --git a/quickfixj-core/src/main/java/quickfix/JdbcStore.java b/quickfixj-core/src/main/java/quickfix/JdbcStore.java index 1d2eee8e9..0087e9de8 100644 --- a/quickfixj-core/src/main/java/quickfix/JdbcStore.java +++ b/quickfixj-core/src/main/java/quickfix/JdbcStore.java @@ -194,32 +194,6 @@ public int getNextTargetMsgSeqNum() throws IOException { return cache.getNextTargetMsgSeqNum(); } - int getNextSenderMsgSeqNumFromDb() throws SQLException { - return getNextMsgSeqNumsFromDb()[0]; - } - - int getNextTargetMsgSeqNumFromDb() throws SQLException { - return getNextMsgSeqNumsFromDb()[1]; - } - - int[] getNextMsgSeqNumsFromDb() throws SQLException { - try (Connection connection = dataSource.getConnection()) { - try (PreparedStatement query = connection.prepareStatement(SQL_GET_SEQNUMS)) { - setSessionIdParameters(query, 1); - - try (ResultSet result = query.executeQuery()) { - if (result.next()) { - int targetSeqNum = result.getInt(2); - int senderSeqNum = result.getInt(3); - return new int[] {senderSeqNum, targetSeqNum}; - } else { - return new int[] {-1, -1}; - } - } - } - } - } - public void incrNextSenderMsgSeqNum() throws IOException { cache.incrNextSenderMsgSeqNum(); setNextSenderMsgSeqNum(cache.getNextSenderMsgSeqNum()); diff --git a/quickfixj-stress-test/README.md b/quickfixj-stress-test/README.md index 42db2f780..179fb1bd5 100644 --- a/quickfixj-stress-test/README.md +++ b/quickfixj-stress-test/README.md @@ -60,15 +60,13 @@ $ java -jar quickfixj-stress-test/target/quickfixj-stress-test.jar -h Some useful jcstress options include: +- `h` - Display help - `-t ` - Run specific test or test group - `-v` - Verbose mode -- `-time ` - Duration for each test iteration -- `-iters ` - Number of iterations - `-jvmArgs ` - Additional JVM arguments ## Additional Notes -- Test results are generated in the `results/` directory by default - For CI/CD integration, use the full path to the JAR file - Stress tests may take significant time to complete depending on your system resources - Review jcstress documentation for advanced configuration options @@ -77,7 +75,6 @@ Some useful jcstress options include: If you encounter issues: -1. Ensure all prerequisites are met -2. Verify the JAR file was built successfully in `quickfixj-stress-test/target/` -3. Check that you have sufficient system resources (CPU, memory) -4. Review the jcstress output for specific error messages \ No newline at end of file +- Verify the JAR file was built successfully in `quickfixj-stress-test/target/` +- Check that you have sufficient system resources (CPU, memory) +- Review the jcstress output for specific error messages \ No newline at end of file diff --git a/quickfixj-stress-test/pom.xml b/quickfixj-stress-test/pom.xml index 7d8e05e57..7b2738411 100644 --- a/quickfixj-stress-test/pom.xml +++ b/quickfixj-stress-test/pom.xml @@ -28,11 +28,6 @@ jcstress-core ${jcstress.version} - - hsqldb - hsqldb - ${hsqldb.version} - org.slf4j slf4j-jdk14 diff --git a/quickfixj-stress-test/src/main/java/quickfix/JdbcStoreHsqldbStressTest.java b/quickfixj-stress-test/src/main/java/quickfix/JdbcStoreHsqldbStressTest.java deleted file mode 100644 index 04eec08db..000000000 --- a/quickfixj-stress-test/src/main/java/quickfix/JdbcStoreHsqldbStressTest.java +++ /dev/null @@ -1,214 +0,0 @@ -package quickfix; - -import org.apache.mina.util.AvailablePortFinder; -import org.hsqldb.Server; -import org.openjdk.jcstress.annotations.Actor; -import org.openjdk.jcstress.annotations.Arbiter; -import org.openjdk.jcstress.annotations.Expect; -import org.openjdk.jcstress.annotations.JCStressTest; -import org.openjdk.jcstress.annotations.Outcome; -import org.openjdk.jcstress.annotations.State; -import org.openjdk.jcstress.infra.results.JJJJ_Result; - -import javax.sql.DataSource; -import java.io.IOException; -import java.sql.SQLException; -import java.sql.Wrapper; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -@SuppressWarnings("unused") -public class JdbcStoreHsqldbStressTest { - - private static final SessionID SESSION_ID = new SessionID(FixVersions.BEGINSTRING_FIX44, "JDBC_INITIATOR", "JDBC_ACCEPTOR"); - - private static JdbcStoreWrapper createWrapper() throws Exception { - int dbPort = AvailablePortFinder.getNextAvailable(); - - Server dbServer = new Server(); - dbServer.setDatabaseName(0, "quickfix-jdbc-test"); - dbServer.setDatabasePath(0, "mem:quickfix-jdbc-test"); - dbServer.setAddress("127.0.0.1"); - dbServer.setPort(dbPort); - dbServer.start(); - - String connectionUrl = "jdbc:hsqldb:hsql://127.0.0.1:" + dbPort + "/quickfix-jdbc-test"; - - SessionSettings settings = new SessionSettings(); - settings.setString(SESSION_ID, JdbcSetting.SETTING_JDBC_DRIVER, "org.hsqldb.jdbcDriver"); - settings.setString(SESSION_ID, JdbcSetting.SETTING_JDBC_CONNECTION_URL, connectionUrl); - settings.setString(SESSION_ID, JdbcSetting.SETTING_JDBC_USER, "SA"); - settings.setString(SESSION_ID, JdbcSetting.SETTING_JDBC_PASSWORD, ""); - settings.setString(SESSION_ID, JdbcSetting.SETTING_JDBC_CONNECTION_TEST_QUERY, "CALL NOW()"); - - DataSource dataSource = JdbcUtil.getDataSource(settings, SESSION_ID); - StressTestDbUtil.initTables(dataSource); - - JdbcStore store = new JdbcStore(settings, SESSION_ID, null); - - return new JdbcStoreWrapper(store, dbServer); - } - - @State - @JCStressTest - @Outcome(id = "2, 2, 2, 2", expect = Expect.ACCEPTABLE) - public static class SingleSenderSequenceTest { - - private final JdbcStoreWrapper underTest; - - public SingleSenderSequenceTest() { - try { - this.underTest = createWrapper(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - // application thread - @SuppressWarnings("unused") - @Actor - public void incrementSender() { - underTest.incrementSenderSeqNum(); - } - - // QFJ Message Processor - @SuppressWarnings("unused") - @Actor - public void incrementTarget() { - underTest.incrementTargetSeqNum(); - } - - @Arbiter - public void captureResult(JJJJ_Result result) { - result.r1 = underTest.getCacheSenderSequence(); - result.r2 = underTest.getCacheTargetSequence(); - - int[] seqs = underTest.getNextMsgSeqNumsFromDb(); - - result.r3 = seqs[0]; - result.r4 = seqs[1]; - - underTest.close(); - } - } - - @State - @JCStressTest - @Outcome(id = "3, 2, 3, 2", expect = Expect.ACCEPTABLE) - public static class TwoSendersSequenceTest { - - private final JdbcStoreWrapper underTest; - - public TwoSendersSequenceTest() { - try { - this.underTest = createWrapper(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - // application thread - @SuppressWarnings("unused") - @Actor - public void incrementSender1() { - underTest.incrementSenderSeqNum(); - } - - // application thread - @SuppressWarnings("unused") - @Actor - public void incrementSender2() { - underTest.incrementSenderSeqNum(); - } - - // QFJ Message Processor - @SuppressWarnings("unused") - @Actor - public void incrementTarget() { - underTest.incrementTargetSeqNum(); - } - - @Arbiter - public void captureResult(JJJJ_Result result) { - result.r1 = underTest.getCacheSenderSequence(); - result.r2 = underTest.getCacheTargetSequence(); - - int[] seqs = underTest.getNextMsgSeqNumsFromDb(); - - result.r3 = seqs[0]; - result.r4 = seqs[1]; - - underTest.close(); - } - } - - private static final class JdbcStoreWrapper { - - private final JdbcStore messageStore; - private final Server dbServer; - private final Lock senderSequenceLock; - private final Lock targetSequenceLock; - - public JdbcStoreWrapper(JdbcStore messageStore, Server dbServer) { - this.messageStore = messageStore; - this.dbServer = dbServer; - this.senderSequenceLock = new ReentrantLock(); - this.targetSequenceLock = new ReentrantLock(); - } - - public void incrementSenderSeqNum() { - senderSequenceLock.lock(); - - try { - messageStore.incrNextSenderMsgSeqNum(); - } catch (IOException e) { - throw new RuntimeException(e); - } finally { - senderSequenceLock.unlock(); - } - } - - public void incrementTargetSeqNum() { - targetSequenceLock.lock(); - - try { - messageStore.incrNextTargetMsgSeqNum(); - } catch (IOException e) { - throw new RuntimeException(e); - } finally { - targetSequenceLock.unlock(); - } - } - - public int getCacheSenderSequence() { - try { - return messageStore.getNextSenderMsgSeqNum(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public int getCacheTargetSequence() { - try { - return messageStore.getNextTargetMsgSeqNum(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public int[] getNextMsgSeqNumsFromDb() { - try { - return messageStore.getNextMsgSeqNumsFromDb(); - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - public void close() { - try { - dbServer.stop(); - } catch (RuntimeException ignored) { - } - } - } -} diff --git a/quickfixj-stress-test/src/main/java/quickfix/JdbcStoreStressTest.java b/quickfixj-stress-test/src/main/java/quickfix/JdbcStoreStressTest.java index dcb2dcb11..97f3fab22 100644 --- a/quickfixj-stress-test/src/main/java/quickfix/JdbcStoreStressTest.java +++ b/quickfixj-stress-test/src/main/java/quickfix/JdbcStoreStressTest.java @@ -35,6 +35,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -82,10 +83,10 @@ private static JdbcStoreWrapper createWrapper() throws Exception { doReturn(insertSessionQuery).when(connection).prepareStatement(insertSessionSql); // UPDATE SEQUENCE NUMS - UpdateSequenceStatement updateSequenceNumsQuery = new UpdateSequenceStatement(); + Database database = new Database(); String updateSequenceNumsSql = JdbcStore.getUpdateSequenceNumsSql(sessionTableName, idWhereClause); - doReturn(updateSequenceNumsQuery).when(connection).prepareStatement(updateSequenceNumsSql); + doAnswer(invocationOnMock -> new UpdateSequenceStatement(database)).when(connection).prepareStatement(updateSequenceNumsSql); JdbcStore jdbcStore = new JdbcStore(settings, SESSION_ID, dataSource); @@ -97,12 +98,27 @@ private static JdbcStoreWrapper createWrapper() throws Exception { throw new IllegalStateException("Invalid next target sequence: " + jdbcStore.getNextTargetMsgSeqNum()); } - return new JdbcStoreWrapper(jdbcStore, updateSequenceNumsQuery); + return new JdbcStoreWrapper(jdbcStore, database); } + /** + *
+     *   JVM args: [-Dfile.encoding=UTF-8, -XX:-UseBiasedLocking, -XX:+StressLCM, -XX:+StressGCM]
+     *
+     *       RESULT  SAMPLES     FREQ      EXPECT  DESCRIPTION
+     *   2, 1, 2, 1        0    0,00%   Forbidden  Invalid source and target sequences stored in the database
+     *   2, 1, 2, 2        0    0,00%   Forbidden  Invalid source sequence stored in the database
+     *   2, 2, 2, 1        1   <0,01%   Forbidden  Invalid target sequence stored in the database
+     *   2, 2, 2, 2   30,886  100,00%  Acceptable
+     *
+     * 
+ */ @State @JCStressTest @Outcome(id = "2, 2, 2, 2", expect = Expect.ACCEPTABLE) + @Outcome(id = "2, 2, 2, 1", expect = Expect.FORBIDDEN, desc = "Invalid target sequence stored in the database") + @Outcome(id = "2, 1, 2, 2", expect = Expect.FORBIDDEN, desc = "Invalid source sequence stored in the database") + @Outcome(id = "2, 1, 2, 1", expect = Expect.FORBIDDEN, desc = "Invalid source and target sequences stored in the database") public static class SingleSenderSequenceTest { private final JdbcStoreWrapper underTest; @@ -116,17 +132,15 @@ public SingleSenderSequenceTest() { } // application thread - @SuppressWarnings("unused") @Actor public void incrementSender() { - underTest.incrementSenderSeqNum(); + underTest.incrementSenderSequence(); } // QFJ Message Processor - @SuppressWarnings("unused") @Actor public void incrementTarget() { - underTest.incrementTargetSeqNum(); + underTest.incrementTargetSequence(); } @Arbiter @@ -138,9 +152,20 @@ public void captureResult(JJJJ_Result result) { } } + /** + *
+     *   JVM args: [-Dfile.encoding=UTF-8, -XX:-UseBiasedLocking, -XX:+StressLCM, -XX:+StressGCM]
+     *
+     *       RESULT  SAMPLES     FREQ      EXPECT  DESCRIPTION
+     *   3, 2, 2, 2        1   <0,01%   Forbidden
+     *   3, 2, 3, 2   46,502  100,00%  Acceptable
+     *
+     * 
+ */ @State @JCStressTest @Outcome(id = "3, 2, 3, 2", expect = Expect.ACCEPTABLE) + @Outcome(expect = Expect.FORBIDDEN) public static class TwoSendersSequenceTest { private final JdbcStoreWrapper underTest; @@ -154,24 +179,21 @@ public TwoSendersSequenceTest() { } // application thread - @SuppressWarnings("unused") @Actor public void incrementSender1() { - underTest.incrementSenderSeqNum(); + underTest.incrementSenderSequence(); } // application thread - @SuppressWarnings("unused") @Actor public void incrementSender2() { - underTest.incrementSenderSeqNum(); + underTest.incrementSenderSequence(); } // QFJ Message Processor - @SuppressWarnings("unused") @Actor public void incrementTarget() { - underTest.incrementTargetSeqNum(); + underTest.incrementTargetSequence(); } @Arbiter @@ -185,23 +207,23 @@ public void captureResult(JJJJ_Result result) { private static final class JdbcStoreWrapper { - private final JdbcStore messageStore; - private final UpdateSequenceStatement updateSequenceNumsQuery; + private final JdbcStore store; + private final Database database; private final Lock senderSequenceLock; private final Lock targetSequenceLock; - public JdbcStoreWrapper(JdbcStore messageStore, UpdateSequenceStatement updateSequenceNumsQuery) { - this.messageStore = messageStore; - this.updateSequenceNumsQuery = updateSequenceNumsQuery; + public JdbcStoreWrapper(JdbcStore store, Database database) { + this.store = store; + this.database = database; this.senderSequenceLock = new ReentrantLock(); this.targetSequenceLock = new ReentrantLock(); } - public void incrementSenderSeqNum() { + public void incrementSenderSequence() { senderSequenceLock.lock(); try { - messageStore.incrNextSenderMsgSeqNum(); + store.incrNextSenderMsgSeqNum(); } catch (IOException e) { throw new RuntimeException(e); } finally { @@ -209,11 +231,11 @@ public void incrementSenderSeqNum() { } } - public void incrementTargetSeqNum() { + public void incrementTargetSequence() { targetSequenceLock.lock(); try { - messageStore.incrNextTargetMsgSeqNum(); + store.incrNextTargetMsgSeqNum(); } catch (IOException e) { throw new RuntimeException(e); } finally { @@ -223,7 +245,7 @@ public void incrementTargetSeqNum() { public int getCacheSenderSequence() { try { - return messageStore.getNextSenderMsgSeqNum(); + return store.getNextSenderMsgSeqNum(); } catch (IOException e) { throw new RuntimeException(e); } @@ -231,29 +253,55 @@ public int getCacheSenderSequence() { public int getCacheTargetSequence() { try { - return messageStore.getNextTargetMsgSeqNum(); + return store.getNextTargetMsgSeqNum(); } catch (IOException e) { throw new RuntimeException(e); } } public int getDbSenderSequence() { - return updateSequenceNumsQuery.senderSeqNum; + return database.senderSequence; } public int getDbTargetSequence() { - return updateSequenceNumsQuery.targetSeqNum; + return database.targetSequence; + } + } + + private static final class Database { + + private final Lock lock; + private int senderSequence; + private int targetSequence; + + public Database() { + this.lock = new ReentrantLock(); + this.senderSequence = -1; + this.targetSequence = -1; + } + + public void update(int senderSequence, int targetSequence) { + lock.lock(); + + try { + this.senderSequence = senderSequence; + this.targetSequence = targetSequence; + } finally { + lock.unlock(); + } } } private static final class UpdateSequenceStatement implements PreparedStatement { - private int senderSeqNum; - private int targetSeqNum; + private final Database database; + private int senderSequence; + private int targetSequence; - public UpdateSequenceStatement() { - this.senderSeqNum = -1; - this.targetSeqNum = -1; + public UpdateSequenceStatement(Database database) { + this.database = database; + this.senderSequence = -1; + this.targetSequence = -1; } @Override @@ -289,9 +337,9 @@ public void setShort(int parameterIndex, short x) { @Override public void setInt(int parameterIndex, int x) { if (parameterIndex == 1) { - targetSeqNum = x; + targetSequence = x; } else if (parameterIndex == 2) { - senderSeqNum = x; + senderSequence = x; } } @@ -371,6 +419,7 @@ public void setObject(int parameterIndex, Object x) { @Override public boolean execute() { + database.update(senderSequence, targetSequence); return true; } diff --git a/quickfixj-stress-test/src/main/java/quickfix/MemoryStoreStressTest.java b/quickfixj-stress-test/src/main/java/quickfix/MemoryStoreStressTest.java new file mode 100644 index 000000000..1ec31a053 --- /dev/null +++ b/quickfixj-stress-test/src/main/java/quickfix/MemoryStoreStressTest.java @@ -0,0 +1,144 @@ +package quickfix; + +import org.openjdk.jcstress.annotations.Actor; +import org.openjdk.jcstress.annotations.Arbiter; +import org.openjdk.jcstress.annotations.Expect; +import org.openjdk.jcstress.annotations.JCStressTest; +import org.openjdk.jcstress.annotations.Outcome; +import org.openjdk.jcstress.annotations.State; +import org.openjdk.jcstress.infra.results.JJ_Result; + +import java.io.IOException; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +@SuppressWarnings("unused") +public class MemoryStoreStressTest { + + @State + @JCStressTest + @Outcome(id = "2, 2", expect = Expect.ACCEPTABLE) + @Outcome(id = "1, 2", expect = Expect.FORBIDDEN, desc = "Source sequence update lost") + @Outcome(id = "2, 1", expect = Expect.FORBIDDEN, desc = "Target sequence update lost") + public static class SingleSenderSequenceTest { + + private final MemoryStoreWrapper underTest; + + public SingleSenderSequenceTest() { + this.underTest = new MemoryStoreWrapper(); + } + + // application thread + @Actor + public void incrementSender() { + underTest.incrementSenderSequence(); + } + + // QFJ Message Processor + @Actor + public void incrementTarget() { + underTest.incrementTargetSequence(); + } + + @Arbiter + public void captureResult(JJ_Result result) { + result.r1 = underTest.getSenderSequence(); + result.r2 = underTest.getTargetSequence(); + } + } + + @State + @JCStressTest + @Outcome(id = "3, 2", expect = Expect.ACCEPTABLE) + @Outcome(expect = Expect.FORBIDDEN) + public static class TwoSendersSequenceTest { + + private final MemoryStoreWrapper underTest; + + public TwoSendersSequenceTest() { + this.underTest = new MemoryStoreWrapper(); + } + + // application thread + @Actor + public void incrementSender1() { + underTest.incrementSenderSequence(); + } + + // application thread + @Actor + public void incrementSender2() { + underTest.incrementSenderSequence(); + } + + // QFJ Message Processor + @Actor + public void incrementTarget() { + underTest.incrementTargetSequence(); + } + + @Arbiter + public void captureResult(JJ_Result result) { + result.r1 = underTest.getSenderSequence(); + result.r2 = underTest.getTargetSequence(); + } + } + + private static final class MemoryStoreWrapper { + + private final MemoryStore store; + private final Lock senderSequenceLock; + private final Lock targetSequenceLock; + + public MemoryStoreWrapper() { + try { + this.store = new MemoryStore(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + this.senderSequenceLock = new ReentrantLock(); + this.targetSequenceLock = new ReentrantLock(); + } + + public void incrementSenderSequence() { + senderSequenceLock.lock(); + + try { + store.incrNextSenderMsgSeqNum(); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + senderSequenceLock.unlock(); + } + } + + public void incrementTargetSequence() { + targetSequenceLock.lock(); + + try { + store.incrNextTargetMsgSeqNum(); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + targetSequenceLock.unlock(); + } + } + + public int getSenderSequence() { + try { + return store.getNextSenderMsgSeqNum(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public int getTargetSequence() { + try { + return store.getNextTargetMsgSeqNum(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/quickfixj-stress-test/src/main/java/quickfix/StressTestDbUtil.java b/quickfixj-stress-test/src/main/java/quickfix/StressTestDbUtil.java deleted file mode 100644 index 7d2eec066..000000000 --- a/quickfixj-stress-test/src/main/java/quickfix/StressTestDbUtil.java +++ /dev/null @@ -1,77 +0,0 @@ -package quickfix; - -import javax.sql.DataSource; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.nio.charset.StandardCharsets; -import java.sql.Connection; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.function.Function; - -import static java.util.Objects.requireNonNull; - -public final class StressTestDbUtil { - - private static final int BUFFER_LENGTH = 4096; - - public static void initTables(DataSource dataSource) throws SQLException, IOException { - try (Connection connection = dataSource.getConnection()) { - executeSql(connection, "config/sql/hsqldb/sessions_table.sql", new HypersonicPreprocessor("sessions")::preprocessSQL); - executeSql(connection, "config/sql/hsqldb/messages_table.sql", new HypersonicPreprocessor("messages")::preprocessSQL); - } - } - - private static void executeSql(Connection connection, String sqlResource, Function preprocessor) throws IOException, SQLException { - try (InputStream in = JdbcStoreHsqldbStressTest.class.getClassLoader().getResourceAsStream(sqlResource)) { - String sql = readFully(requireNonNull(in)); - sql = preprocessor.apply(sql); - - try (Statement statement = connection.createStatement()) { - statement.execute(sql); - } - } - } - - private static String readFully(InputStream in) throws IOException { - StringBuilder out = new StringBuilder(in.available()); - char[] buffer = new char[BUFFER_LENGTH]; - - try (InputStreamReader reader = new InputStreamReader(in, StandardCharsets.UTF_8)) { - int readCharCount; - - while ((readCharCount = reader.read(buffer)) != -1) { - out.append(buffer, 0, readCharCount); - } - } - - return out.toString(); - } - - // this is copied from quickfixj-core test cope - quickfix.JdbcTestSupport.HypersonicPreprocessor - // TODO move to a common place and make visible for both modules - private static final class HypersonicPreprocessor { - - private final String tableName; - - public HypersonicPreprocessor(String tableName) { - this.tableName = tableName; - } - - public String preprocessSQL(String sql) { - sql = sql.replaceAll("USE .*;", ""); - sql = sql.replaceAll(" UNSIGNED", ""); - sql = sql.replaceAll("AUTO_INCREMENT", "IDENTITY"); - sql = sql.replaceAll("TEXT", "VARCHAR(256)"); - - if (tableName != null) { - sql = sql.replaceAll("CREATE TABLE [a-z]+", "CREATE TABLE " + tableName); - sql = sql.replaceAll("DROP TABLE [a-z]+", "DROP TABLE " + tableName); - sql = sql.replaceAll("DELETE FROM [a-z]+", "DELETE FROM " + tableName); - } - - return sql; - } - } -}