diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..8ef90ee --- /dev/null +++ b/pom.xml @@ -0,0 +1,57 @@ + + + 4.0.0 + + tokutek + iibench-mongodb + 0.0.1-SNAPSHOT + MongoDB iibench + + + target + target/classes + ${project.artifactId}-${project.version} + src + + + org.apache.maven.plugins + maven-compiler-plugin + 3.2 + + 5 + 5 + + + + org.codehaus.mojo + exec-maven-plugin + 1.3.2 + + + + java + + + + + + + + + + io.dropwizard.metrics + metrics-core + 3.1.0 + + + org.slf4j + slf4j-log4j12 + 1.7.7 + + + org.mongodb + mongo-java-driver + 2.13.0 + + + diff --git a/run.simple.bash b/run.simple.bash index 4dc91c2..52c861d 100755 --- a/run.simple.bash +++ b/run.simple.bash @@ -92,17 +92,18 @@ export QUERY_NUM_DOCS_BEGIN=1000000 export CREATE_COLLECTION=Y -javac -cp $CLASSPATH:$PWD/src src/jmongoiibench.java +mvn compile export LOG_NAME=mongoiibench-${MAX_ROWS}-${NUM_DOCUMENTS_PER_INSERT}-${MAX_INSERTS_PER_SECOND}-${NUM_LOADER_THREADS}-${QUERIES_PER_INTERVAL}-${QUERY_INTERVAL_SECONDS}.txt -export BENCHMARK_TSV=${LOG_NAME}.tsv +export BENCHMARK_CSV_DIR=mongoiibench-${MAX_ROWS}-${NUM_DOCUMENTS_PER_INSERT}-${MAX_INSERTS_PER_SECOND}-${NUM_LOADER_THREADS}-${QUERIES_PER_INTERVAL}-${QUERY_INTERVAL_SECONDS}-csv rm -f $LOG_NAME -rm -f $BENCHMARK_TSV +rm -rf $BENCHMARK_CSV_DIR +mkdir $BENCHMARK_CSV_DIR T="$(date +%s)" -java -cp $CLASSPATH:$PWD/src jmongoiibench $DB_NAME $NUM_LOADER_THREADS $MAX_ROWS $NUM_DOCUMENTS_PER_INSERT $NUM_INSERTS_PER_FEEDBACK $NUM_SECONDS_PER_FEEDBACK $BENCHMARK_TSV $MONGO_COMPRESSION $MONGO_BASEMENT $RUN_SECONDS $QUERIES_PER_INTERVAL $QUERY_INTERVAL_SECONDS $QUERY_LIMIT $QUERY_NUM_DOCS_BEGIN $MAX_INSERTS_PER_SECOND $WRITE_CONCERN $MONGO_SERVER $MONGO_PORT $NUM_CHAR_FIELDS $LENGTH_CHAR_FIELDS $NUM_SECONDARY_INDEXES $PERCENT_COMPRESSIBLE $CREATE_COLLECTION | tee -a $LOG_NAME +mvn exec:java -Dexec.mainClass=jmongoiibench -Dexec.args="$DB_NAME $NUM_LOADER_THREADS $MAX_ROWS $NUM_DOCUMENTS_PER_INSERT $NUM_INSERTS_PER_FEEDBACK $NUM_SECONDS_PER_FEEDBACK $BENCHMARK_CSV_DIR $MONGO_COMPRESSION $MONGO_BASEMENT $RUN_SECONDS $QUERIES_PER_INTERVAL $QUERY_INTERVAL_SECONDS $QUERY_LIMIT $QUERY_NUM_DOCS_BEGIN $MAX_INSERTS_PER_SECOND $WRITE_CONCERN $MONGO_SERVER $MONGO_PORT $NUM_CHAR_FIELDS $LENGTH_CHAR_FIELDS $NUM_SECONDARY_INDEXES $PERCENT_COMPRESSIBLE $CREATE_COLLECTION" | tee -a $LOG_NAME echo "" | tee -a $LOG_NAME T="$(($(date +%s)-T))" printf "`date` | iibench duration = %02d:%02d:%02d:%02d\n" "$((T/86400))" "$((T/3600%24))" "$((T/60%60))" "$((T%60))" | tee -a $LOG_NAME diff --git a/src/ShortConsoleReporter.java b/src/ShortConsoleReporter.java new file mode 100644 index 0000000..373fd1d --- /dev/null +++ b/src/ShortConsoleReporter.java @@ -0,0 +1,350 @@ +import com.codahale.metrics.*; +import java.io.PrintStream; +import java.text.DateFormat; +import java.util.*; +import java.util.concurrent.TimeUnit; + +/** + * A reporter which outputs measurements to a {@link PrintStream}, like {@code System.out}. + */ +public class ShortConsoleReporter extends ScheduledReporter { + /** + * Returns a new {@link Builder} for {@link ShortConsoleReporter}. + * + * @param registry the registry to report + * @return a {@link Builder} instance for a {@link ShortConsoleReporter} + */ + public static Builder forRegistry(MetricRegistry registry) { + return new Builder(registry); + } + + /** + * A builder for {@link ShortConsoleReporter} instances. Defaults to using the default locale and + * time zone, writing to {@code System.out}, converting rates to events/second, converting + * durations to milliseconds, printing headers every 24 output rows, and not filtering metrics. + */ + public static class Builder { + private final MetricRegistry registry; + private PrintStream output; + private Locale locale; + private Clock clock; + private TimeZone timeZone; + private TimeUnit rateUnit; + private TimeUnit durationUnit; + private int rowsBetweenHeaders; + private MetricFilter filter; + + private Builder(MetricRegistry registry) { + this.registry = registry; + this.output = System.out; + this.locale = Locale.getDefault(); + this.clock = Clock.defaultClock(); + this.timeZone = TimeZone.getDefault(); + this.rateUnit = TimeUnit.SECONDS; + this.durationUnit = TimeUnit.MILLISECONDS; + this.rowsBetweenHeaders = 24; + this.filter = MetricFilter.ALL; + } + + /** + * Write to the given {@link PrintStream}. + * + * @param output a {@link PrintStream} instance. + * @return {@code this} + */ + public Builder outputTo(PrintStream output) { + this.output = output; + return this; + } + + /** + * Format numbers for the given {@link Locale}. + * + * @param locale a {@link Locale} + * @return {@code this} + */ + public Builder formattedFor(Locale locale) { + this.locale = locale; + return this; + } + + /** + * Use the given {@link Clock} instance for the time. + * + * @param clock a {@link Clock} instance + * @return {@code this} + */ + public Builder withClock(Clock clock) { + this.clock = clock; + return this; + } + + /** + * Use the given {@link TimeZone} for the time. + * + * @param timeZone a {@link TimeZone} + * @return {@code this} + */ + public Builder formattedFor(TimeZone timeZone) { + this.timeZone = timeZone; + return this; + } + + /** + * Convert rates to the given time unit. + * + * @param rateUnit a unit of time + * @return {@code this} + */ + public Builder convertRatesTo(TimeUnit rateUnit) { + this.rateUnit = rateUnit; + return this; + } + + /** + * Convert durations to the given time unit. + * + * @param durationUnit a unit of time + * @return {@code this} + */ + public Builder convertDurationsTo(TimeUnit durationUnit) { + this.durationUnit = durationUnit; + return this; + } + + /** + * Only report metrics which match the given filter. + * + * @param filter a {@link MetricFilter} + * @return {@code this} + */ + public Builder filter(MetricFilter filter) { + this.filter = filter; + return this; + } + + /** + * Print a dstat-like header every n rows. + * + * @param n rows of data between header lines + * @return {@code this} + */ + public Builder setRowsBetweenHeaders(int rowsBetweenHeaders) { + this.rowsBetweenHeaders = rowsBetweenHeaders; + return this; + } + + /** + * Builds a {@link ShortConsoleReporter} with the given properties. + * + * @return a {@link ShortConsoleReporter} + */ + public ShortConsoleReporter build() { + return new ShortConsoleReporter(registry, + output, + locale, + clock, + timeZone, + rateUnit, + durationUnit, + rowsBetweenHeaders, + filter); + } + } + + private static final int CONSOLE_WIDTH = 80; + + private final PrintStream output; + private final Locale locale; + private final Clock clock; + private final DateFormat dateFormat; + private final int rowsBetweenHeaders; + private int rowsSinceHeader; + + private ShortConsoleReporter(MetricRegistry registry, + PrintStream output, + Locale locale, + Clock clock, + TimeZone timeZone, + TimeUnit rateUnit, + TimeUnit durationUnit, + int rowsBetweenHeaders, + MetricFilter filter) { + super(registry, "short-console-reporter", filter, rateUnit, durationUnit); + this.output = output; + this.locale = locale; + this.clock = clock; + this.dateFormat = DateFormat.getDateTimeInstance(DateFormat.SHORT, + DateFormat.MEDIUM, + locale); + this.rowsBetweenHeaders = rowsBetweenHeaders; + dateFormat.setTimeZone(timeZone); + + // Force header at beginning. + rowsSinceHeader = rowsBetweenHeaders; + } + + private void printHeader(String name, int width) { + for (int i = 0; i < (width - name.length()) / 2; ++i) { + output.print('-'); + } + output.print(' '); + output.print(name); + output.print(' '); + for (int i = 0; i < width - name.length() - ((width - name.length()) / 2); ++i) { + output.print('-'); + } + } + + private void printSubHeader(String name, int width) { + for (int i = 0; i < width - name.length(); ++i) { + output.print(' '); + } + output.print(name); + } + + @Override + public void report(SortedMap gauges, + SortedMap counters, + SortedMap histograms, + SortedMap meters, + SortedMap timers) { + final String dateTime = dateFormat.format(new Date(clock.getTime())); + + if (rowsSinceHeader == rowsBetweenHeaders) { + printHeader("time", dateTime.length()); + output.print(' '); + + for (Map.Entry entry : gauges.entrySet()) { + printHeader(entry.getKey(), 12); + output.print(' '); + } + + for (Map.Entry entry : counters.entrySet()) { + printHeader(entry.getKey(), 12); + output.print(' '); + } + + for (Map.Entry entry : histograms.entrySet()) { + printHeader(entry.getKey(), 64); + output.print(' '); + } + + for (Map.Entry entry : meters.entrySet()) { + printHeader(entry.getKey(), 12); + output.print(' '); + } + + for (Map.Entry entry : timers.entrySet()) { + printHeader(entry.getKey(), 64); + output.print(' '); + } + + output.println(); + + output.print(' '); + printSubHeader("", dateTime.length()); + output.print(" |"); + + for (Map.Entry entry : gauges.entrySet()) { + output.print(' '); + printSubHeader("value", 12); + output.print(" |"); + } + + for (Map.Entry entry : counters.entrySet()) { + output.print(' '); + printSubHeader("count", 12); + output.print(" |"); + } + + for (Map.Entry entry : histograms.entrySet()) { + output.print(' '); + printSubHeader("count", 12); + output.print(' '); + printSubHeader("mean", 12); + output.print(' '); + printSubHeader("median", 12); + output.print(' '); + printSubHeader("99", 12); + output.print(' '); + printSubHeader("99.9", 12); + output.print(" |"); + } + + for (Map.Entry entry : meters.entrySet()) { + output.print(' '); + printSubHeader("mean", 12); + output.print(" |"); + } + + for (Map.Entry entry : timers.entrySet()) { + output.print(' '); + printSubHeader("count", 12); + output.print(' '); + printSubHeader("mean", 12); + output.print(' '); + printSubHeader("median", 12); + output.print(' '); + printSubHeader("99", 12); + output.print(' '); + printSubHeader("99.9", 12); + output.print(" |"); + } + + output.println(); + rowsSinceHeader = 0; + } + rowsSinceHeader++; + + output.print(' '); + output.print(dateTime); + output.print(" |"); + + if (!gauges.isEmpty()) { + for (Map.Entry entry : gauges.entrySet()) { + output.printf(locale, " %12s |", entry.getValue().getValue()); + } + } + + if (!counters.isEmpty()) { + for (Map.Entry entry : counters.entrySet()) { + output.printf(locale, " %12d |", entry.getValue().getCount()); + } + } + + if (!histograms.isEmpty()) { + for (Map.Entry entry : histograms.entrySet()) { + Snapshot s = entry.getValue().getSnapshot(); + output.printf(locale, " %12d %12.3f %12.3f %12.3f %12.3f |", + entry.getValue().getCount(), + s.getMean(), + s.getMedian(), + s.get99thPercentile(), + s.get999thPercentile()); + } + } + + if (!meters.isEmpty()) { + for (Map.Entry entry : meters.entrySet()) { + output.printf(locale, " %12.3f |", convertRate(entry.getValue().getMeanRate())); + } + } + + if (!timers.isEmpty()) { + for (Map.Entry entry : timers.entrySet()) { + Snapshot s = entry.getValue().getSnapshot(); + output.printf(locale, " %12d %12.3f %12.3f %12.3f %12.3f |", + entry.getValue().getCount(), + convertDuration(s.getMean()), + convertDuration(s.getMedian()), + convertDuration(s.get99thPercentile()), + convertDuration(s.get999thPercentile())); + } + } + + output.println(); + output.flush(); + } + +} diff --git a/src/jmongoiibench.java b/src/jmongoiibench.java index 66dea0c..a09065b 100644 --- a/src/jmongoiibench.java +++ b/src/jmongoiibench.java @@ -1,3 +1,8 @@ +import com.codahale.metrics.*; +import org.apache.log4j.BasicConfigurator; +import java.util.concurrent.TimeUnit; +import java.util.Locale; + //import com.mongodb.Mongo; import com.mongodb.MongoClient; import com.mongodb.MongoClientOptions; @@ -26,12 +31,13 @@ import java.util.concurrent.locks.ReentrantLock; public class jmongoiibench { - public static AtomicLong globalInserts = new AtomicLong(0); + static final MetricRegistry metrics = new MetricRegistry(); + private final Timer insertLatencies = metrics.timer(MetricRegistry.name("iib", "inserts")); + private final Timer queryLatencies = metrics.timer(MetricRegistry.name("iib", "queries")); + private final Meter exceptions = metrics.meter(MetricRegistry.name("iib", "exc")); + public static AtomicLong globalWriterThreads = new AtomicLong(0); public static AtomicLong globalQueryThreads = new AtomicLong(0); - public static AtomicLong globalQueriesExecuted = new AtomicLong(0); - public static AtomicLong globalQueriesTimeMs = new AtomicLong(0); - public static AtomicLong globalQueriesStarted = new AtomicLong(0); public static AtomicLong globalInsertExceptions = new AtomicLong(0); public static Writer writer = null; @@ -83,6 +89,8 @@ public jmongoiibench() { } public static void main (String[] args) throws Exception { + BasicConfigurator.configure(); + if (args.length != 23) { logMe("*** ERROR : CONFIGURATION ISSUE ***"); logMe("jmongoiibench [database name] [number of writer threads] [documents per collection] [documents per insert] [inserts feedback] [seconds feedback] [log file name] [compression type] [basement node size (bytes)] [number of seconds to run] [queries per interval] [interval (seconds)] [query limit] [inserts for begin query] [max inserts per second] [writeconcern] [server] [port] [num char fields] [length char fields] [num secondary indexes] [percent compressible] [create collection]"); @@ -201,6 +209,19 @@ else if ((myWriteConcern.toLowerCase().equals("safe"))) { DB db = m.getDB(dbName); + final ShortConsoleReporter consoleReporter = ShortConsoleReporter.forRegistry(metrics) + .convertRatesTo(TimeUnit.SECONDS) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .build(); + consoleReporter.start(10, TimeUnit.SECONDS); + + final CsvReporter csvReporter = CsvReporter.forRegistry(metrics) + .formatFor(Locale.US) + .convertRatesTo(TimeUnit.SECONDS) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .build(new File(logFileName)); + csvReporter.start(1, TimeUnit.SECONDS); + // determine server type : mongo or tokumx DBObject checkServerCmd = new BasicDBObject(); CommandResult commandResult = db.command("buildInfo"); @@ -233,12 +254,6 @@ else if ((myWriteConcern.toLowerCase().equals("safe"))) { numMaxInserts = numMaxInserts / writerThreads; } - try { - writer = new BufferedWriter(new FileWriter(new File(logFileName))); - } catch (IOException e) { - e.printStackTrace(); - } - if (createCollection.equals("n")) { logMe("Skipping collection creation"); @@ -309,9 +324,6 @@ else if ((myWriteConcern.toLowerCase().equals("safe"))) { jmongoiibench t = new jmongoiibench(); - Thread reporterThread = new Thread(t.new MyReporter()); - reporterThread.start(); - Thread queryThread = new Thread(t.new MyQuery(1, 1, numMaxInserts, db)); if (queriesPerMinute > 0.0) { queryThread.start(); @@ -332,10 +344,6 @@ else if ((myWriteConcern.toLowerCase().equals("safe"))) { e.printStackTrace(); } - // wait for reporter thread to terminate - if (reporterThread.isAlive()) - reporterThread.join(); - // wait for query thread to terminate if (queryThread.isAlive()) queryThread.join(); @@ -346,14 +354,6 @@ else if ((myWriteConcern.toLowerCase().equals("safe"))) { tWriterThreads[i].join(); } - try { - if (writer != null) { - writer.close(); - } - } catch (IOException e) { - e.printStackTrace(); - } - // m.dropDatabase("mydb"); m.close(); @@ -426,15 +426,16 @@ public void run() { aDocs[i]=doc; } + final Timer.Context context = insertLatencies.time(); try { coll.insert(aDocs); numInserts += documentsPerInsert; - globalInserts.addAndGet(documentsPerInsert); - } catch (Exception e) { logMe("Writer thread %d : EXCEPTION",threadNumber); e.printStackTrace(); - globalInsertExceptions.incrementAndGet(); + exceptions.mark(); + } finally { + context.stop(); } if (allDone == 1) @@ -501,13 +502,11 @@ public void run() { nextQueryMillis = thisNow + msBetweenQueries; // check if number of inserts reached - if (globalInserts.get() >= queryBeginNumDocs) { + if (insertLatencies.getCount() >= queryBeginNumDocs) { if (outputStarted) { logMe("Query thread %d : now running",threadNumber,queryBeginNumDocs); outputStarted = false; - // set query start time - globalQueriesStarted.set(thisNow); } whichQuery++; @@ -674,22 +673,20 @@ def generate_register_query(row_count, start_time): } //logMe("Executed query %d",whichQuery); - long now = System.currentTimeMillis(); - DBCursor cursor = coll.find(query,keys).limit(queryLimit); + final Timer.Context context = queryLatencies.time(); try { - while(cursor.hasNext()) { - //System.out.println(cursor.next()); - cursor.next(); + DBCursor cursor = coll.find(query,keys).limit(queryLimit); + try { + while(cursor.hasNext()) { + //System.out.println(cursor.next()); + cursor.next(); + } + } finally { + cursor.close(); } } finally { - cursor.close(); + context.stop(); } - long elapsed = System.currentTimeMillis() - now; - - //logMe("Query thread %d : performing : %s",threadNumber,thisSelect); - - globalQueriesExecuted.incrementAndGet(); - globalQueriesTimeMs.addAndGet(elapsed); } else { if (outputWaiting) { @@ -709,190 +706,6 @@ def generate_register_query(row_count, start_time): } } - - // reporting thread, outputs information to console and file - class MyReporter implements Runnable { - public void run() - { - long t0 = System.currentTimeMillis(); - long lastInserts = 0; - long lastQueriesNum = 0; - long lastQueriesMs = 0; - long lastMs = t0; - long intervalNumber = 0; - long nextFeedbackMillis = t0 + (1000 * secondsPerFeedback * (intervalNumber + 1)); - long nextFeedbackInserts = lastInserts + insertsPerFeedback; - long thisInserts = 0; - long thisQueriesNum = 0; - long thisQueriesMs = 0; - long thisQueriesStarted = 0; - long endDueToTime = System.currentTimeMillis() + (1000 * numSeconds); - - while (allDone == 0) - { - try { - Thread.sleep(100); - } catch (Exception e) { - e.printStackTrace(); - } - - long now = System.currentTimeMillis(); - - if (now >= endDueToTime) - { - allDone = 1; - } - - thisInserts = globalInserts.get(); - thisQueriesNum = globalQueriesExecuted.get(); - thisQueriesMs = globalQueriesTimeMs.get(); - thisQueriesStarted = globalQueriesStarted.get(); - if (((now > nextFeedbackMillis) && (secondsPerFeedback > 0)) || - ((thisInserts >= nextFeedbackInserts) && (insertsPerFeedback > 0))) - { - intervalNumber++; - nextFeedbackMillis = t0 + (1000 * secondsPerFeedback * (intervalNumber + 1)); - nextFeedbackInserts = (intervalNumber + 1) * insertsPerFeedback; - - long elapsed = now - t0; - long thisIntervalMs = now - lastMs; - - long thisIntervalInserts = thisInserts - lastInserts; - double thisIntervalInsertsPerSecond = thisIntervalInserts/(double)thisIntervalMs*1000.0; - double thisInsertsPerSecond = thisInserts/(double)elapsed*1000.0; - - long thisIntervalQueriesNum = thisQueriesNum - lastQueriesNum; - long thisIntervalQueriesMs = thisQueriesMs - lastQueriesMs; - double thisIntervalQueryAvgMs = 0; - double thisQueryAvgMs = 0; - double thisIntervalAvgQPM = 0; - double thisAvgQPM = 0; - - long thisInsertExceptions = globalInsertExceptions.get(); - - if (thisIntervalQueriesNum > 0) { - thisIntervalQueryAvgMs = thisIntervalQueriesMs/(double)thisIntervalQueriesNum; - } - if (thisQueriesNum > 0) { - thisQueryAvgMs = thisQueriesMs/(double)thisQueriesNum; - } - - if (thisQueriesStarted > 0) - { - long adjustedElapsed = now - thisQueriesStarted; - if (adjustedElapsed > 0) - { - thisAvgQPM = (double)thisQueriesNum/((double)adjustedElapsed/1000.0/60.0); - } - if (thisIntervalMs > 0) - { - thisIntervalAvgQPM = (double)thisIntervalQueriesNum/((double)thisIntervalMs/1000.0/60.0); - } - } - - if (secondsPerFeedback > 0) - { - logMe("%,d inserts : %,d seconds : cum ips=%,.2f : int ips=%,.2f : cum avg qry=%,.2f : int avg qry=%,.2f : cum avg qpm=%,.2f : int avg qpm=%,.2f : exceptions=%,d", thisInserts, elapsed / 1000l, thisInsertsPerSecond, thisIntervalInsertsPerSecond, thisQueryAvgMs, thisIntervalQueryAvgMs, thisAvgQPM, thisIntervalAvgQPM, thisInsertExceptions); - } else { - logMe("%,d inserts : %,d seconds : cum ips=%,.2f : int ips=%,.2f : cum avg qry=%,.2f : int avg qry=%,.2f : cum avg qpm=%,.2f : int avg qpm=%,.2f : exceptions=%,d", intervalNumber * insertsPerFeedback, elapsed / 1000l, thisInsertsPerSecond, thisIntervalInsertsPerSecond, thisQueryAvgMs, thisIntervalQueryAvgMs, thisAvgQPM, thisIntervalAvgQPM, thisInsertExceptions); - } - - try { - if (outputHeader) - { - writer.write("tot_inserts\telap_secs\tcum_ips\tint_ips\tcum_qry_avg\tint_qry_avg\tcum_qpm\tint_qpm\texceptions\n"); - outputHeader = false; - } - - String statusUpdate = ""; - - if (secondsPerFeedback > 0) - { - statusUpdate = String.format("%d\t%d\t%.2f\t%.2f\t%.2f\t%.2f\t%.2f\t%.2f\t%,d\n",thisInserts, elapsed / 1000l, thisInsertsPerSecond, thisIntervalInsertsPerSecond, thisQueryAvgMs, thisIntervalQueryAvgMs, thisAvgQPM, thisIntervalAvgQPM, thisInsertExceptions); - } else { - statusUpdate = String.format("%d\t%d\t%.2f\t%.2f\t%.2f\t%.2f\t%.2f\t%.2f\t%,d\n",intervalNumber * insertsPerFeedback, elapsed / 1000l, thisInsertsPerSecond, thisIntervalInsertsPerSecond, thisQueryAvgMs, thisIntervalQueryAvgMs, thisAvgQPM, thisIntervalAvgQPM, thisInsertExceptions); - } - writer.write(statusUpdate); - writer.flush(); - } catch (IOException e) { - e.printStackTrace(); - } - - lastInserts = thisInserts; - lastQueriesNum = thisQueriesNum; - lastQueriesMs = thisQueriesMs; - - lastMs = now; - } - } - - // output final numbers... - long now = System.currentTimeMillis(); - thisInserts = globalInserts.get(); - thisQueriesNum = globalQueriesExecuted.get(); - thisQueriesMs = globalQueriesTimeMs.get(); - thisQueriesStarted = globalQueriesStarted.get(); - intervalNumber++; - nextFeedbackMillis = t0 + (1000 * secondsPerFeedback * (intervalNumber + 1)); - nextFeedbackInserts = (intervalNumber + 1) * insertsPerFeedback; - long elapsed = now - t0; - long thisIntervalMs = now - lastMs; - long thisIntervalInserts = thisInserts - lastInserts; - double thisIntervalInsertsPerSecond = thisIntervalInserts/(double)thisIntervalMs*1000.0; - double thisInsertsPerSecond = thisInserts/(double)elapsed*1000.0; - long thisIntervalQueriesNum = thisQueriesNum - lastQueriesNum; - long thisIntervalQueriesMs = thisQueriesMs - lastQueriesMs; - double thisIntervalQueryAvgMs = 0; - double thisQueryAvgMs = 0; - double thisIntervalAvgQPM = 0; - double thisAvgQPM = 0; - if (thisIntervalQueriesNum > 0) { - thisIntervalQueryAvgMs = thisIntervalQueriesMs/(double)thisIntervalQueriesNum; - } - if (thisQueriesNum > 0) { - thisQueryAvgMs = thisQueriesMs/(double)thisQueriesNum; - } - if (thisQueriesStarted > 0) - { - long adjustedElapsed = now - thisQueriesStarted; - if (adjustedElapsed > 0) - { - thisAvgQPM = (double)thisQueriesNum/((double)adjustedElapsed/1000.0/60.0); - } - if (thisIntervalMs > 0) - { - thisIntervalAvgQPM = (double)thisIntervalQueriesNum/((double)thisIntervalMs/1000.0/60.0); - } - } - if (secondsPerFeedback > 0) - { - logMe("%,d inserts : %,d seconds : cum ips=%,.2f : int ips=%,.2f : cum avg qry=%,.2f : int avg qry=%,.2f : cum avg qpm=%,.2f : int avg qpm=%,.2f", thisInserts, elapsed / 1000l, thisInsertsPerSecond, thisIntervalInsertsPerSecond, thisQueryAvgMs, thisIntervalQueryAvgMs, thisAvgQPM, thisIntervalAvgQPM); - } else { - logMe("%,d inserts : %,d seconds : cum ips=%,.2f : int ips=%,.2f : cum avg qry=%,.2f : int avg qry=%,.2f : cum avg qpm=%,.2f : int avg qpm=%,.2f", intervalNumber * insertsPerFeedback, elapsed / 1000l, thisInsertsPerSecond, thisIntervalInsertsPerSecond, thisQueryAvgMs, thisIntervalQueryAvgMs, thisAvgQPM, thisIntervalAvgQPM); - } - try { - if (outputHeader) - { - writer.write("tot_inserts\telap_secs\tcum_ips\tint_ips\tcum_qry_avg\tint_qry_avg\tcum_qpm\tint_qpm\n"); - outputHeader = false; - } - String statusUpdate = ""; - if (secondsPerFeedback > 0) - { - statusUpdate = String.format("%d\t%d\t%.2f\t%.2f\t%.2f\t%.2f\t%.2f\t%.2f\n",thisInserts, elapsed / 1000l, thisInsertsPerSecond, thisIntervalInsertsPerSecond, thisQueryAvgMs, thisIntervalQueryAvgMs, thisAvgQPM, thisIntervalAvgQPM); - } else { - statusUpdate = String.format("%d\t%d\t%.2f\t%.2f\t%.2f\t%.2f\t%.2f\t%.2f\n",intervalNumber * insertsPerFeedback, elapsed / 1000l, thisInsertsPerSecond, thisIntervalInsertsPerSecond, thisQueryAvgMs, thisIntervalQueryAvgMs, thisAvgQPM, thisIntervalAvgQPM); - } - writer.write(statusUpdate); - writer.flush(); - } catch (IOException e) { - e.printStackTrace(); - } - - } - } - - public static void logMe(String format, Object... args) { System.out.println(Thread.currentThread() + String.format(format, args)); }