diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..a3721f9
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,48 @@
+
+
+ 4.0.0
+
+ tokutek
+ sysbench-mongodb
+ 0.0.1-SNAPSHOT
+ MongoDB sysbench
+
+
+ target
+ target/classes
+ ${project.artifactId}-${project.version}
+ src
+
+
+ org.codehaus.mojo
+ exec-maven-plugin
+ 1.3.2
+
+
+
+ java
+
+
+
+
+
+
+
+
+
+ io.dropwizard.metrics
+ metrics-core
+ 3.1.0
+
+
+ org.slf4j
+ slf4j-log4j12
+ 1.7.7
+
+
+ org.mongodb
+ mongo-java-driver
+ 2.13.0-rc0
+
+
+
diff --git a/run.simple.bash b/run.simple.bash
index d7a644a..d84a662 100755
--- a/run.simple.bash
+++ b/run.simple.bash
@@ -20,22 +20,21 @@ else
exit 1
fi
-javac -cp $CLASSPATH:$PWD/src src/jmongosysbenchload.java
-javac -cp $CLASSPATH:$PWD/src src/jmongosysbenchexecute.java
-
+mvn compile
# load the data
if [[ $DOLOAD = "yes" ]]; then
echo Do load at $( date )
export LOG_NAME=mongoSysbenchLoad-${NUM_COLLECTIONS}-${NUM_DOCUMENTS_PER_COLLECTION}-${NUM_LOADER_THREADS}.txt
- export BENCHMARK_TSV=${LOG_NAME}.tsv
+ export BENCHMARK_CSV_DIR=mongoSysbenchLoad-${NUM_COLLECTIONS}-${NUM_DOCUMENTS_PER_COLLECTION}-${NUM_LOADER_THREADS}-csv
rm -f $LOG_NAME
- rm -f $BENCHMARK_TSV
+ rm -rf $BENCHMARK_CSV_DIR
+ mkdir $BENCHMARK_CSV_DIR
T="$(date +%s)"
- java -cp $CLASSPATH:$PWD/src jmongosysbenchload $NUM_COLLECTIONS $DB_NAME $NUM_LOADER_THREADS $NUM_DOCUMENTS_PER_COLLECTION $NUM_DOCUMENTS_PER_INSERT $NUM_INSERTS_PER_FEEDBACK $NUM_SECONDS_PER_FEEDBACK $BENCHMARK_TSV $MONGO_COMPRESSION $MONGO_BASEMENT $WRITE_CONCERN $MONGO_SERVER $MONGO_PORT $USERNAME $PASSWORD
+ mvn exec:java -Dexec.mainClass=jmongosysbenchload -Dexec.args="$NUM_COLLECTIONS $DB_NAME $NUM_LOADER_THREADS $NUM_DOCUMENTS_PER_COLLECTION $NUM_DOCUMENTS_PER_INSERT $NUM_INSERTS_PER_FEEDBACK $NUM_SECONDS_PER_FEEDBACK $BENCHMARK_CSV_DIR $MONGO_COMPRESSION $MONGO_BASEMENT $WRITE_CONCERN $MONGO_SERVER $MONGO_PORT $USERNAME $PASSWORD"
echo "" | tee -a $LOG_NAME
T="$(($(date +%s)-T))"
printf "`date` | sysbench loader duration = %02d:%02d:%02d:%02d\n" "$((T/86400))" "$((T/3600%24))" "$((T/60%60))" "$((T%60))" | tee -a $LOG_NAME
@@ -47,13 +46,14 @@ fi
if [[ $DOQUERY = "yes" ]]; then
echo Do query at $( date )
export LOG_NAME=mongoSysbenchExecute-${NUM_COLLECTIONS}-${NUM_DOCUMENTS_PER_COLLECTION}-${NUM_WRITER_THREADS}.txt
- export BENCHMARK_TSV=${LOG_NAME}.tsv
+ export BENCHMARK_CSV_DIR=mongoSysbenchExecute-${NUM_COLLECTIONS}-${NUM_DOCUMENTS_PER_COLLECTION}-${NUM_WRITER_THREADS}-csv
rm -f $LOG_NAME
- rm -f $BENCHMARK_TSV
+ rm -rf $BENCHMARK_CSV_DIR
+ mkdir $BENCHMARK_CSV_DIR
T="$(date +%s)"
- java -cp $CLASSPATH:$PWD/src jmongosysbenchexecute $NUM_COLLECTIONS $DB_NAME $NUM_WRITER_THREADS $NUM_DOCUMENTS_PER_COLLECTION $NUM_SECONDS_PER_FEEDBACK $BENCHMARK_TSV $SYSBENCH_AUTO_COMMIT $RUN_TIME_SECONDS $SYSBENCH_RANGE_SIZE $SYSBENCH_POINT_SELECTS $SYSBENCH_SIMPLE_RANGES $SYSBENCH_SUM_RANGES $SYSBENCH_ORDER_RANGES $SYSBENCH_DISTINCT_RANGES $SYSBENCH_INDEX_UPDATES $SYSBENCH_NON_INDEX_UPDATES $SYSBENCH_INSERTS $WRITE_CONCERN $MAX_TPS $MONGO_SERVER $MONGO_PORT $SEED $USERNAME $PASSWORD | tee -a $LOG_NAME
+ mvn exec:java -Dexec.mainClass=jmongosysbenchexecute -Dexec.args="$NUM_COLLECTIONS $DB_NAME $NUM_WRITER_THREADS $NUM_DOCUMENTS_PER_COLLECTION $NUM_SECONDS_PER_FEEDBACK $BENCHMARK_CSV_DIR $SYSBENCH_AUTO_COMMIT $RUN_TIME_SECONDS $SYSBENCH_RANGE_SIZE $SYSBENCH_POINT_SELECTS $SYSBENCH_SIMPLE_RANGES $SYSBENCH_SUM_RANGES $SYSBENCH_ORDER_RANGES $SYSBENCH_DISTINCT_RANGES $SYSBENCH_INDEX_UPDATES $SYSBENCH_NON_INDEX_UPDATES $SYSBENCH_INSERTS $WRITE_CONCERN $MAX_TPS $MONGO_SERVER $MONGO_PORT $SEED $USERNAME $PASSWORD" | tee -a $LOG_NAME
echo "" | tee -a $LOG_NAME
T="$(($(date +%s)-T))"
printf "`date` | sysbench benchmark duration = %02d:%02d:%02d:%02d\n" "$((T/86400))" "$((T/3600%24))" "$((T/60%60))" "$((T%60))" | tee -a $LOG_NAME
diff --git a/src/jmongosysbenchexecute.java b/src/jmongosysbenchexecute.java
index bf35445..8516892 100644
--- a/src/jmongosysbenchexecute.java
+++ b/src/jmongosysbenchexecute.java
@@ -1,3 +1,8 @@
+import com.codahale.metrics.*;
+import org.apache.log4j.BasicConfigurator;
+import java.util.concurrent.TimeUnit;
+import java.util.Locale;
+
//import com.mongodb.Mongo;
import com.mongodb.MongoClient;
import com.mongodb.MongoCredential;
@@ -30,12 +35,14 @@
import java.util.concurrent.locks.ReentrantLock;
public class jmongosysbenchexecute {
- public static AtomicLong globalInserts = new AtomicLong(0);
- public static AtomicLong globalDeletes = new AtomicLong(0);
- public static AtomicLong globalUpdates = new AtomicLong(0);
- public static AtomicLong globalPointQueries = new AtomicLong(0);
- public static AtomicLong globalRangeQueries = new AtomicLong(0);
- public static AtomicLong globalSysbenchTransactions = new AtomicLong(0);
+ static final MetricRegistry metrics = new MetricRegistry();
+ private final Timer insertLatencies = metrics.timer(MetricRegistry.name("sysbench", "inserts"));
+ private final Timer deleteLatencies = metrics.timer(MetricRegistry.name("sysbench", "deletes"));
+ private final Timer updateLatencies = metrics.timer(MetricRegistry.name("sysbench", "updates"));
+ private final Timer pointQueryLatencies = metrics.timer(MetricRegistry.name("sysbench", "ptqueries"));
+ private final Timer rangeQueryLatencies = metrics.timer(MetricRegistry.name("sysbench", "rgqueries"));
+ private final Timer globalSysbenchTransactions = metrics.timer(MetricRegistry.name("sysbench", "tps"));
+
public static AtomicLong globalWriterThreads = new AtomicLong(0);
public static Writer writer = null;
@@ -78,6 +85,8 @@ public jmongosysbenchexecute() {
}
public static void main (String[] args) throws Exception {
+ BasicConfigurator.configure();
+
if (args.length != 24) {
logMe("*** ERROR : CONFIGURATION ISSUE ***");
logMe("jsysbenchexecute [number of collections] [database name] [number of writer threads] [documents per collection] [seconds feedback] "+
@@ -172,6 +181,19 @@ else if ((myWriteConcern.toLowerCase().equals("safe"))) {
DB db = m.getDB(dbName);
+ final ConsoleReporter consoleReporter = ConsoleReporter.forRegistry(metrics)
+ .convertRatesTo(TimeUnit.SECONDS)
+ .convertDurationsTo(TimeUnit.MILLISECONDS)
+ .build();
+ consoleReporter.start(10, TimeUnit.SECONDS);
+
+ final CsvReporter csvReporter = CsvReporter.forRegistry(metrics)
+ .formatFor(Locale.US)
+ .convertRatesTo(TimeUnit.SECONDS)
+ .convertDurationsTo(TimeUnit.MILLISECONDS)
+ .build(new File(logFileName));
+ csvReporter.start(1, TimeUnit.SECONDS);
+
// determine server type : mongo or tokumx
DBObject checkServerCmd = new BasicDBObject();
CommandResult commandResult = db.command("buildInfo");
@@ -188,12 +210,6 @@ else if ((myWriteConcern.toLowerCase().equals("safe"))) {
logMe(" index technology = %s",indexTechnology);
logMe("-------------------------------------------------------------------------------------------------");
- try {
- writer = new BufferedWriter(new FileWriter(new File(logFileName)));
- } catch (IOException e) {
- e.printStackTrace();
- }
-
if ((!indexTechnology.toLowerCase().equals("tokumx")) && (!indexTechnology.toLowerCase().equals("mongo"))) {
// unknown index technology, abort
logMe(" *** Unknown Indexing Technology %s, shutting down",indexTechnology);
@@ -213,24 +229,12 @@ else if ((myWriteConcern.toLowerCase().equals("safe"))) {
tWriterThreads[i].start();
}
- Thread reporterThread = new Thread(t.new MyReporter());
- reporterThread.start();
- reporterThread.join();
-
// wait for writer threads to terminate
for (int i=0; i 0)
- runEndMillis = t0 + (1000 * runSeconds);
-
- while ((System.currentTimeMillis() < runEndMillis) && (thisInserts < numMaxInserts))
- {
- try {
- Thread.sleep(100);
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- long now = System.currentTimeMillis();
-
-
-// public static AtomicLong globalDeletes = new AtomicLong(0);
-// public static AtomicLong globalUpdates = new AtomicLong(0);
-// public static AtomicLong globalPointQueries = new AtomicLong(0);
-// public static AtomicLong globalRangeQueries = new AtomicLong(0);
-
-
- thisInserts = globalInserts.get();
- thisSysbenchTransactions = globalSysbenchTransactions.get();
-
- if ((now > nextFeedbackMillis) && (secondsPerFeedback > 0))
- {
- intervalNumber++;
- nextFeedbackMillis = t0 + (1000 * secondsPerFeedback * (intervalNumber + 1));
-
- long elapsed = now - t0;
- long thisIntervalMs = now - lastMs;
- long thisWriterThreads = globalWriterThreads.get();
-
- long thisIntervalSysbenchTransactions = thisSysbenchTransactions - lastSysbenchTransactions;
- double thisIntervalSysbenchTransactionsPerSecond = thisIntervalSysbenchTransactions/(double)thisIntervalMs*1000.0;
- double thisSysbenchTransactionsPerSecond = thisSysbenchTransactions/(double)elapsed*1000.0;
-
- long thisIntervalInserts = thisInserts - lastInserts;
- double thisIntervalInsertsPerSecond = thisIntervalInserts/(double)thisIntervalMs*1000.0;
- double thisInsertsPerSecond = thisInserts/(double)elapsed*1000.0;
-
- logMe("%,d seconds : cum tps=%,.2f : int tps=%,.2f : cum ips=%,.2f : int ips=%,.2f : writers=%,d", elapsed / 1000l, thisSysbenchTransactionsPerSecond, thisIntervalSysbenchTransactionsPerSecond, thisInsertsPerSecond, thisIntervalInsertsPerSecond, thisWriterThreads);
-
- try {
- if (outputHeader)
- {
- writer.write("elap_secs\tcum_tps\tint_tps\tcum_ips\tint_ips\n");
- outputHeader = false;
- }
-
- String statusUpdate = "";
-
- statusUpdate = String.format("%d\t%.2f\t%.2f\t%.2f\t%.2f\n", elapsed / 1000l, thisSysbenchTransactionsPerSecond, thisIntervalSysbenchTransactionsPerSecond, thisInsertsPerSecond, thisIntervalInsertsPerSecond);
-
- writer.write(statusUpdate);
- writer.flush();
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- lastInserts = thisInserts;
- lastSysbenchTransactions = thisSysbenchTransactions;
-
- lastMs = now;
- }
- }
-
- // shutdown all the writers
- allDone = 1;
- }
- }
-
-
public static void logMe(String format, Object... args) {
System.out.println(Thread.currentThread() + String.format(format, args));
}
diff --git a/src/jmongosysbenchload.java b/src/jmongosysbenchload.java
index 420039e..fb8e46b 100644
--- a/src/jmongosysbenchload.java
+++ b/src/jmongosysbenchload.java
@@ -1,3 +1,8 @@
+import com.codahale.metrics.*;
+import org.apache.log4j.BasicConfigurator;
+import java.util.concurrent.TimeUnit;
+import java.util.Locale;
+
//import com.mongodb.Mongo;
import com.mongodb.MongoClient;
import com.mongodb.MongoCredential;
@@ -27,10 +32,11 @@
import java.util.concurrent.locks.ReentrantLock;
public class jmongosysbenchload {
- public static AtomicLong globalInserts = new AtomicLong(0);
+ static final MetricRegistry metrics = new MetricRegistry();
+ private final Timer insertLatencies = metrics.timer(MetricRegistry.name("sysbench", "inserts"));
+
public static AtomicLong globalWriterThreads = new AtomicLong(0);
- public static Writer writer = null;
public static boolean outputHeader = true;
public static int numCollections;
@@ -56,6 +62,8 @@ public jmongosysbenchload() {
}
public static void main (String[] args) throws Exception {
+ BasicConfigurator.configure();
+
if (args.length != 15) {
logMe("*** ERROR : CONFIGURATION ISSUE ***");
logMe("jsysbenchload [number of collections] [database name] [number of writer threads] [documents per collection] [documents per insert] [inserts feedback] [seconds feedback] [log file name] [compression type] [basement node size (bytes)] [writeconcern] [server] [port] [username] [password]");
@@ -124,6 +132,19 @@ else if ((myWriteConcern.toLowerCase().equals("safe"))) {
DB db = m.getDB(dbName);
+ final ConsoleReporter consoleReporter = ConsoleReporter.forRegistry(metrics)
+ .convertRatesTo(TimeUnit.SECONDS)
+ .convertDurationsTo(TimeUnit.MILLISECONDS)
+ .build();
+ consoleReporter.start(10, TimeUnit.SECONDS);
+
+ final CsvReporter csvReporter = CsvReporter.forRegistry(metrics)
+ .formatFor(Locale.US)
+ .convertRatesTo(TimeUnit.SECONDS)
+ .convertDurationsTo(TimeUnit.MILLISECONDS)
+ .build(new File(logFileName));
+ csvReporter.start(1, TimeUnit.SECONDS);
+
// determine server type : mongo or tokumx
DBObject checkServerCmd = new BasicDBObject();
CommandResult commandResult = db.command("buildInfo");
@@ -146,12 +167,6 @@ else if ((myWriteConcern.toLowerCase().equals("safe"))) {
logMe("--------------------------------------------------");
- try {
- writer = new BufferedWriter(new FileWriter(new File(logFileName)));
- } catch (IOException e) {
- e.printStackTrace();
- }
-
if ((!indexTechnology.toLowerCase().equals("tokumx")) && (!indexTechnology.toLowerCase().equals("mongo"))) {
// unknown index technology, abort
logMe(" *** Unknown Indexing Technology %s, shutting down",indexTechnology);
@@ -160,9 +175,6 @@ else if ((myWriteConcern.toLowerCase().equals("safe"))) {
jmongosysbenchload t = new jmongosysbenchload();
- Thread reporterThread = new Thread(t.new MyReporter());
- reporterThread.start();
-
Thread[] tWriterThreads = new Thread[writerThreads];
for (int collectionNumber = 0; collectionNumber < numCollections; collectionNumber++) {
@@ -215,17 +227,6 @@ else if ((myWriteConcern.toLowerCase().equals("safe"))) {
// all the writers are finished
allDone = 1;
- if (reporterThread.isAlive())
- reporterThread.join();
-
- try {
- if (writer != null) {
- writer.close();
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
-
// m.dropDatabase("mydb");
m.close();
@@ -310,9 +311,13 @@ public void run() {
aDocs[i]=doc;
}
- coll.insert(aDocs);
- numInserts += documentsPerInsert;
- globalInserts.addAndGet(documentsPerInsert);
+ final Timer.Context context = insertLatencies.time();
+ try {
+ coll.insert(aDocs);
+ numInserts += documentsPerInsert;
+ } finally {
+ context.stop();
+ }
}
} catch (Exception e) {
@@ -340,117 +345,6 @@ public static String sysbenchString(java.util.Random rand, String thisMask) {
return sb.toString();
}
-
- // reporting thread, outputs information to console and file
- class MyReporter implements Runnable {
- public void run()
- {
- long t0 = System.currentTimeMillis();
- long lastInserts = 0;
- long lastMs = t0;
- long intervalNumber = 0;
- long nextFeedbackMillis = t0 + (1000 * secondsPerFeedback * (intervalNumber + 1));
- long nextFeedbackInserts = lastInserts + insertsPerFeedback;
- long thisInserts = 0;
-
- while (allDone == 0)
- {
- try {
- Thread.sleep(100);
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- long now = System.currentTimeMillis();
- thisInserts = globalInserts.get();
- if (((now > nextFeedbackMillis) && (secondsPerFeedback > 0)) ||
- ((thisInserts >= nextFeedbackInserts) && (insertsPerFeedback > 0)))
- {
- intervalNumber++;
- nextFeedbackMillis = t0 + (1000 * secondsPerFeedback * (intervalNumber + 1));
- nextFeedbackInserts = (intervalNumber + 1) * insertsPerFeedback;
-
- long elapsed = now - t0;
- long thisIntervalMs = now - lastMs;
-
- long thisIntervalInserts = thisInserts - lastInserts;
- double thisIntervalInsertsPerSecond = thisIntervalInserts/(double)thisIntervalMs*1000.0;
- double thisInsertsPerSecond = thisInserts/(double)elapsed*1000.0;
-
- if (secondsPerFeedback > 0)
- {
- logMe("%,d inserts : %,d seconds : cum ips=%,.2f : int ips=%,.2f", thisInserts, elapsed / 1000l, thisInsertsPerSecond, thisIntervalInsertsPerSecond);
- } else {
- logMe("%,d inserts : %,d seconds : cum ips=%,.2f : int ips=%,.2f", intervalNumber * insertsPerFeedback, elapsed / 1000l, thisInsertsPerSecond, thisIntervalInsertsPerSecond);
- }
-
- try {
- if (outputHeader)
- {
- writer.write("tot_inserts\telap_secs\tcum_ips\tint_ips\n");
- outputHeader = false;
- }
-
- String statusUpdate = "";
-
- if (secondsPerFeedback > 0)
- {
- statusUpdate = String.format("%d\t%d\t%.2f\t%.2f\n",thisInserts, elapsed / 1000l, thisInsertsPerSecond, thisIntervalInsertsPerSecond);
- } else {
- statusUpdate = String.format("%d\t%d\t%.2f\t%.2f\n",intervalNumber * insertsPerFeedback, elapsed / 1000l, thisInsertsPerSecond, thisIntervalInsertsPerSecond);
- }
- writer.write(statusUpdate);
- writer.flush();
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- lastInserts = thisInserts;
-
- lastMs = now;
- }
- }
-
- // output final numbers...
- long now = System.currentTimeMillis();
- thisInserts = globalInserts.get();
- intervalNumber++;
- nextFeedbackMillis = t0 + (1000 * secondsPerFeedback * (intervalNumber + 1));
- nextFeedbackInserts = (intervalNumber + 1) * insertsPerFeedback;
- long elapsed = now - t0;
- long thisIntervalMs = now - lastMs;
- long thisIntervalInserts = thisInserts - lastInserts;
- double thisIntervalInsertsPerSecond = thisIntervalInserts/(double)thisIntervalMs*1000.0;
- double thisInsertsPerSecond = thisInserts/(double)elapsed*1000.0;
- if (secondsPerFeedback > 0)
- {
- logMe("%,d inserts : %,d seconds : cum ips=%,.2f : int ips=%,.2f", thisInserts, elapsed / 1000l, thisInsertsPerSecond, thisIntervalInsertsPerSecond);
- } else {
- logMe("%,d inserts : %,d seconds : cum ips=%,.2f : int ips=%,.2f", intervalNumber * insertsPerFeedback, elapsed / 1000l, thisInsertsPerSecond, thisIntervalInsertsPerSecond);
- }
- try {
- if (outputHeader)
- {
- writer.write("tot_inserts\telap_secs\tcum_ips\tint_ips\n");
- outputHeader = false;
- }
- String statusUpdate = "";
- if (secondsPerFeedback > 0)
- {
- statusUpdate = String.format("%d\t%d\t%.2f\t%.2f\n",thisInserts, elapsed / 1000l, thisInsertsPerSecond, thisIntervalInsertsPerSecond);
- } else {
- statusUpdate = String.format("%d\t%d\t%.2f\t%.2f\n",intervalNumber * insertsPerFeedback, elapsed / 1000l, thisInsertsPerSecond, thisIntervalInsertsPerSecond);
- }
- writer.write(statusUpdate);
- writer.flush();
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- }
- }
-
-
public static void logMe(String format, Object... args) {
System.out.println(Thread.currentThread() + String.format(format, args));
}