diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml
index 0244c52bb..823589ee8 100644
--- a/.github/workflows/maven.yml
+++ b/.github/workflows/maven.yml
@@ -468,14 +468,14 @@ jobs:
# In this case, we load the tpcc data.
if [[ ${{matrix.benchmark}} == templated ]]; then
java -jar benchbase.jar -b tpcc -c config/postgres/sample_tpcc_config.xml --create=true --load=true --execute=false --json-histograms results/histograms.json
- java -jar benchbase.jar -b ${{matrix.benchmark}} -c config/postgres/sample_${{matrix.benchmark}}_config.xml --create=false --load=false --execute=true --json-histograms results/histograms.json
+ java -jar benchbase.jar -b ${{matrix.benchmark}} -c config/postgres/sample_${{matrix.benchmark}}_config.xml -im 1000 -mt advanced --create=false --load=false --execute=true --json-histograms results/histograms.json
elif [[ ${{matrix.benchmark}} == tpcc-with-reconnects ]]; then
# See Also: WITH_SERVICE_INTERRUPTIONS=true docker/build-run-benchmark-with-docker.sh
java -jar benchbase.jar -b tpcc -c config/postgres/sample_tpcc_config.xml --create=true --load=true
(sleep 10 && ./scripts/interrupt-docker-db-service.sh postgres) &
- java -jar benchbase.jar -b tpcc -c config/postgres/sample_tpcc_config.xml --execute=true --json-histograms results/histograms.json
+ java -jar benchbase.jar -b tpcc -c config/postgres/sample_tpcc_config.xml -im 1000 -mt advanced --execute=true --json-histograms results/histograms.json
else
- java -jar benchbase.jar -b ${{matrix.benchmark}} -c config/postgres/sample_${{matrix.benchmark}}_config.xml --create=true --load=true --execute=true --json-histograms results/histograms.json
+ java -jar benchbase.jar -b ${{matrix.benchmark}} -c config/postgres/sample_${{matrix.benchmark}}_config.xml -im 1000 -mt advanced --create=true --load=true --execute=true --json-histograms results/histograms.json
fi
# FIXME: Reduce the error rate so we don't need these overrides.
@@ -491,6 +491,12 @@ jobs:
./scripts/check_latest_benchmark_results.sh $results_benchmark
./scripts/check_histogram_results.sh results/histograms.json $ERRORS_THRESHOLD
+ # Running the monitor should create at least three files in the 'monitor' directory.
+ if ![ $(find "./results/monitor" -maxdepth 1 -mindepth 1 | wc -l) -gt 2]; then
+ echo "ERROR: Advanced monitoring unsuccessful, file directory and/or appropriate files not created." >&2
+ exit 1
+ fi
+
- name: Stop custom postgres service
run: |
./docker/postgres-latest/down.sh
@@ -638,14 +644,14 @@ jobs:
# In this case, we load the tpcc data.
if [[ ${{matrix.benchmark}} == templated ]]; then
java -jar benchbase.jar -b tpcc -c config/sqlserver/sample_tpcc_config.xml --create=true --load=true --execute=false --json-histograms results/histograms.json
- java -jar benchbase.jar -b ${{matrix.benchmark}} -c config/sqlserver/sample_${{matrix.benchmark}}_config.xml --create=false --load=false --execute=true --json-histograms results/histograms.json
+ java -jar benchbase.jar -b ${{matrix.benchmark}} -c config/sqlserver/sample_${{matrix.benchmark}}_config.xml -im 1000 -mt advanced --create=false --load=false --execute=true --json-histograms results/histograms.json
elif [[ ${{matrix.benchmark}} == tpcc-with-reconnects ]]; then
# See Also: WITH_SERVICE_INTERRUPTIONS=true docker/build-run-benchmark-with-docker.sh
java -jar benchbase.jar -b tpcc -c config/sqlserver/sample_tpcc_config.xml --create=true --load=true
(sleep 10 && ./scripts/interrupt-docker-db-service.sh sqlserver) &
- java -jar benchbase.jar -b tpcc -c config/sqlserver/sample_tpcc_config.xml --execute=true --json-histograms results/histograms.json
+ java -jar benchbase.jar -b tpcc -c config/sqlserver/sample_tpcc_config.xml -im 1000 -mt advanced --execute=true --json-histograms results/histograms.json
else
- java -jar benchbase.jar -b ${{matrix.benchmark}} -c config/sqlserver/sample_${{matrix.benchmark}}_config.xml --create=true --load=true --execute=true --json-histograms results/histograms.json
+ java -jar benchbase.jar -b ${{matrix.benchmark}} -c config/sqlserver/sample_${{matrix.benchmark}}_config.xml -im 1000 -mt advanced --create=true --load=true --execute=true --json-histograms results/histograms.json
fi
# FIXME: Reduce the error rate so we don't need these overrides.
@@ -659,6 +665,12 @@ jobs:
./scripts/check_latest_benchmark_results.sh $results_benchmark
./scripts/check_histogram_results.sh results/histograms.json $ERRORS_THRESHOLD
+ # Running the monitor should create at least three files in the 'monitor' directory.
+ if ![ $(find "./results/monitor" -maxdepth 1 -mindepth 1 | wc -l) -gt 2]; then
+ echo "ERROR: Advanced monitoring unsuccessful, file directory and/or appropriate files not created." >&2
+ exit 1
+ fi
+
## ----------------------------------------------------------------------------------
## Docker Build Test Publish
## ----------------------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c469cd43c..fc23ca732 100644
--- a/pom.xml
+++ b/pom.xml
@@ -349,6 +349,13 @@
janino
3.1.12
+
+ org.immutables
+ value
+ 2.9.0
+ provided
+
+
diff --git a/src/main/java/com/oltpbenchmark/DBWorkload.java b/src/main/java/com/oltpbenchmark/DBWorkload.java
index f4108c27e..d5af2f5d6 100644
--- a/src/main/java/com/oltpbenchmark/DBWorkload.java
+++ b/src/main/java/com/oltpbenchmark/DBWorkload.java
@@ -79,11 +79,27 @@ public static void main(String[] args) throws Exception {
return;
}
- // Seconds
- int intervalMonitor = 0;
+ // Monitoring setup.
+ ImmutableMonitorInfo.Builder builder = ImmutableMonitorInfo.builder();
if (argsLine.hasOption("im")) {
- intervalMonitor = Integer.parseInt(argsLine.getOptionValue("im"));
+ builder.monitoringInterval(Integer.parseInt(argsLine.getOptionValue("im")));
}
+ if (argsLine.hasOption("mt")) {
+ switch (argsLine.getOptionValue("mt")) {
+ case "advanced":
+ builder.monitoringType(MonitorInfo.MonitoringType.ADVANCED);
+ break;
+ case "throughput":
+ builder.monitoringType(MonitorInfo.MonitoringType.THROUGHPUT);
+ break;
+ default:
+ throw new ParseException(
+ "Monitoring type '"
+ + argsLine.getOptionValue("mt")
+ + "' is undefined, allowed values are: advanced/throughput");
+ }
+ }
+ MonitorInfo monitorInfo = builder.build();
// -------------------------------------------------------------------
// GET PLUGIN LIST
@@ -151,6 +167,14 @@ public static void main(String[] args) throws Exception {
// Nothing to do here !
}
+ // Set monitoring enabled, if all requirements are met.
+ if (monitorInfo.getMonitoringInterval() > 0
+ && monitorInfo.getMonitoringType() == MonitorInfo.MonitoringType.ADVANCED
+ && DatabaseType.get(xmlConfig.getString("type")).shouldCreateMonitoringPrefix()) {
+ LOG.info("Advanced monitoring enabled, prefix will be added to queries.");
+ wrkld.setAdvancedMonitoringEnabled(true);
+ }
+
// ----------------------------------------------------------------
// CREATE BENCHMARK MODULE
// ----------------------------------------------------------------
@@ -518,7 +542,7 @@ public static void main(String[] args) throws Exception {
if (isBooleanOptionSet(argsLine, "execute")) {
// Bombs away!
try {
- Results r = runWorkload(benchList, intervalMonitor);
+ Results r = runWorkload(benchList, monitorInfo);
writeOutputs(r, activeTXTypes, argsLine, xmlConfig);
writeHistograms(r);
@@ -558,8 +582,8 @@ private static Options buildOptions(XMLConfiguration pluginConfig) {
options.addOption(null, "execute", true, "Execute the benchmark workload");
options.addOption("h", "help", false, "Print this help");
options.addOption("s", "sample", true, "Sampling window");
- options.addOption(
- "im", "interval-monitor", true, "Throughput Monitoring Interval in milliseconds");
+ options.addOption("im", "interval-monitor", true, "Monitoring Interval in milliseconds");
+ options.addOption("mt", "monitor-type", true, "Type of Monitoring (throughput/advanced)");
options.addOption(
"d",
"directory",
@@ -733,7 +757,7 @@ private static void runLoader(BenchmarkModule bench)
bench.loadDatabase();
}
- private static Results runWorkload(List benchList, int intervalMonitor)
+ private static Results runWorkload(List benchList, MonitorInfo monitorInfo)
throws IOException {
List> workers = new ArrayList<>();
List workConfs = new ArrayList<>();
@@ -748,7 +772,7 @@ private static Results runWorkload(List benchList, int interval
bench.getBenchmarkName().toUpperCase(), num_phases, (num_phases > 1 ? "s" : "")));
workConfs.add(bench.getWorkloadConfiguration());
}
- Results r = ThreadBench.runRateLimitedBenchmark(workers, workConfs, intervalMonitor);
+ Results r = ThreadBench.runRateLimitedBenchmark(workers, workConfs, monitorInfo);
LOG.info(SINGLE_LINE);
LOG.info("Rate limited reqs/s: {}", r);
return r;
diff --git a/src/main/java/com/oltpbenchmark/ThreadBench.java b/src/main/java/com/oltpbenchmark/ThreadBench.java
index 9161ae904..4f1cb93ac 100644
--- a/src/main/java/com/oltpbenchmark/ThreadBench.java
+++ b/src/main/java/com/oltpbenchmark/ThreadBench.java
@@ -1,17 +1,15 @@
/*
* Copyright 2020 by OLTPBenchmark Project
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
*
*/
@@ -21,7 +19,10 @@
import com.oltpbenchmark.api.BenchmarkModule;
import com.oltpbenchmark.api.TransactionType;
import com.oltpbenchmark.api.Worker;
+import com.oltpbenchmark.api.collectors.monitoring.Monitor;
+import com.oltpbenchmark.api.collectors.monitoring.MonitorGen;
import com.oltpbenchmark.types.State;
+import com.oltpbenchmark.util.MonitorInfo;
import com.oltpbenchmark.util.StringUtil;
import java.util.*;
import org.apache.commons.collections4.map.ListOrderedMap;
@@ -30,30 +31,35 @@
public class ThreadBench implements Thread.UncaughtExceptionHandler {
private static final Logger LOG = LoggerFactory.getLogger(ThreadBench.class);
+ // Determines how long (in ms) to wait until monitoring thread rejoins the
+ // main thread.
+ private static final int MONITOR_REJOIN_TIME = 60000;
private final BenchmarkState testState;
private final List extends Worker extends BenchmarkModule>> workers;
private final ArrayList workerThreads;
private final List workConfs;
private final ArrayList samples = new ArrayList<>();
- private final int intervalMonitor;
+ private final MonitorInfo monitorInfo;
+
+ private Monitor monitor = null;
private ThreadBench(
List extends Worker extends BenchmarkModule>> workers,
List workConfs,
- int intervalMonitoring) {
+ MonitorInfo monitorInfo) {
this.workers = workers;
this.workConfs = workConfs;
this.workerThreads = new ArrayList<>(workers.size());
- this.intervalMonitor = intervalMonitoring;
+ this.monitorInfo = monitorInfo;
this.testState = new BenchmarkState(workers.size() + 1);
}
public static Results runRateLimitedBenchmark(
List> workers,
List workConfs,
- int intervalMonitoring) {
- ThreadBench bench = new ThreadBench(workers, workConfs, intervalMonitoring);
+ MonitorInfo monitorInfo) {
+ ThreadBench bench = new ThreadBench(workers, workConfs, monitorInfo);
return bench.runRateLimitedMultiPhase();
}
@@ -88,10 +94,9 @@ private int finalizeWorkers(ArrayList workerThreads) throws InterruptedE
// to terminate... hands otherwise
/*
- * // CARLO: Maybe we might want to do this to kill threads that are
- * hanging... if (workerThreads.get(i).isAlive()) {
- * workerThreads.get(i).kill(); try { workerThreads.get(i).join(); }
- * catch (InterruptedException e) { } }
+ * // CARLO: Maybe we might want to do this to kill threads that are hanging... if
+ * (workerThreads.get(i).isAlive()) { workerThreads.get(i).kill(); try {
+ * workerThreads.get(i).join(); } catch (InterruptedException e) { } }
*/
requests += workers.get(i).getRequests();
@@ -116,17 +121,11 @@ private Results runRateLimitedMultiPhase() {
this.createWorkerThreads();
// long measureStart = start;
+ Phase phase = null;
- long startTs = System.currentTimeMillis();
- long start = System.nanoTime();
- long warmupStart = System.nanoTime();
- long warmup = warmupStart;
- long measureEnd = -1;
// used to determine the longest sleep interval
double lowestRate = Double.MAX_VALUE;
- Phase phase = null;
-
for (WorkloadState workState : workStates) {
workState.switchToNextPhase();
phase = workState.getCurrentPhase();
@@ -145,6 +144,12 @@ private Results runRateLimitedMultiPhase() {
}
}
+ long startTs = System.currentTimeMillis();
+ long start = System.nanoTime();
+ long warmupStart = System.nanoTime();
+ long warmup = warmupStart;
+ long measureEnd = -1;
+
long intervalNs = getInterval(lowestRate, phase.getArrival());
long nextInterval = start + intervalNs;
@@ -157,8 +162,11 @@ private Results runRateLimitedMultiPhase() {
boolean lastEntry = false;
// Initialize the Monitor
- if (this.intervalMonitor > 0) {
- new MonitorThread(this.intervalMonitor).start();
+ if (this.monitorInfo.getMonitoringInterval() > 0) {
+ this.monitor =
+ MonitorGen.getMonitor(
+ this.monitorInfo, this.testState, this.workers, this.workConfs.get(0));
+ this.monitor.start();
}
// Allow workers to start work.
@@ -301,6 +309,18 @@ private Results runRateLimitedMultiPhase() {
}
}
+ // Stop the monitoring thread separately from cleanup all the workers so we can ignore errors
+ // from these threads (including possible SQLExceptions), but not the others.
+ try {
+ if (this.monitor != null) {
+ this.monitor.interrupt();
+ this.monitor.join(MONITOR_REJOIN_TIME);
+ this.monitor.tearDown();
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+
try {
int requests = finalizeWorkers(this.workerThreads);
@@ -528,42 +548,4 @@ public void run() {
}
}
}
-
- private class MonitorThread extends Thread {
- private final int intervalMonitor;
-
- {
- this.setDaemon(true);
- }
-
- /**
- * @param interval How long to wait between polling in milliseconds
- */
- MonitorThread(int interval) {
- this.intervalMonitor = interval;
- }
-
- @Override
- public void run() {
- LOG.info("Starting MonitorThread Interval [{}ms]", this.intervalMonitor);
- while (true) {
- try {
- Thread.sleep(this.intervalMonitor);
- } catch (InterruptedException ex) {
- return;
- }
-
- // Compute the last throughput
- long measuredRequests = 0;
- synchronized (testState) {
- for (Worker> w : workers) {
- measuredRequests += w.getAndResetIntervalRequests();
- }
- }
- double seconds = this.intervalMonitor / 1000d;
- double tps = (double) measuredRequests / seconds;
- LOG.info("Throughput: {} txn/sec", tps);
- }
- }
- }
}
diff --git a/src/main/java/com/oltpbenchmark/WorkloadConfiguration.java b/src/main/java/com/oltpbenchmark/WorkloadConfiguration.java
index abf327004..5ace0362c 100644
--- a/src/main/java/com/oltpbenchmark/WorkloadConfiguration.java
+++ b/src/main/java/com/oltpbenchmark/WorkloadConfiguration.java
@@ -1,17 +1,15 @@
/*
* Copyright 2020 by OLTPBenchmark Project
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
*
*/
@@ -47,6 +45,7 @@ public class WorkloadConfiguration {
private int isolationMode = Connection.TRANSACTION_SERIALIZABLE;
private String dataDir = null;
private String ddlPath = null;
+ private boolean advancedMonitoringEnabled = false;
/**
* If true, establish a new connection for each transaction, otherwise use one persistent
@@ -129,6 +128,14 @@ public void setMaxRetries(int maxRetries) {
this.maxRetries = maxRetries;
}
+ public void setAdvancedMonitoringEnabled(boolean advancedMonitoringEnabled) {
+ this.advancedMonitoringEnabled = true;
+ }
+
+ public boolean getAdvancedMonitoringEnabled() {
+ return this.advancedMonitoringEnabled;
+ }
+
/**
* @return @see newConnectionPerTxn member docs for behavior.
*/
diff --git a/src/main/java/com/oltpbenchmark/api/BenchmarkModule.java b/src/main/java/com/oltpbenchmark/api/BenchmarkModule.java
index e36752407..d8e9dbb9f 100644
--- a/src/main/java/com/oltpbenchmark/api/BenchmarkModule.java
+++ b/src/main/java/com/oltpbenchmark/api/BenchmarkModule.java
@@ -1,17 +1,15 @@
/*
* Copyright 2020 by OLTPBenchmark Project
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
*
*/
@@ -383,6 +381,9 @@ public Map getProcedures() {
proc.initialize(this.workConf.getDatabaseType());
proc_xref.put(txn, proc);
proc.loadSQLDialect(this.dialects);
+ if (this.workConf.getAdvancedMonitoringEnabled()) {
+ proc.enabledAdvancedMonitoring();
+ }
}
}
if (proc_xref.isEmpty()) {
diff --git a/src/main/java/com/oltpbenchmark/api/Procedure.java b/src/main/java/com/oltpbenchmark/api/Procedure.java
index bc55a958e..548c29d80 100644
--- a/src/main/java/com/oltpbenchmark/api/Procedure.java
+++ b/src/main/java/com/oltpbenchmark/api/Procedure.java
@@ -19,6 +19,7 @@
import com.oltpbenchmark.jdbc.AutoIncrementPreparedStatement;
import com.oltpbenchmark.types.DatabaseType;
+import com.oltpbenchmark.util.MonitoringUtil;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.sql.Connection;
@@ -160,6 +161,19 @@ protected final void loadSQLDialect(StatementDialects dialects) {
}
}
+ /** Enable monitoring for this procedure by adding a monitoring prefixes. */
+ protected final void enabledAdvancedMonitoring() {
+ for (String stmtName : this.getStatements().keySet()) {
+ SQLStmt stmt = this.name_stmt_xref.get(stmtName);
+ LOG.debug("Enabling advanced monitoring for query {}.", stmtName);
+ // Create monitoring prefix.
+ String prefix = MonitoringUtil.getMonitoringMarker();
+ prefix = prefix.replace(MonitoringUtil.getMonitoringQueryId(), stmtName);
+ // Update SQL string.
+ stmt.setSQL(prefix + stmt.getSQL());
+ }
+ }
+
/**
* Hook for testing
*
diff --git a/src/main/java/com/oltpbenchmark/api/collectors/monitoring/DatabaseMonitor.java b/src/main/java/com/oltpbenchmark/api/collectors/monitoring/DatabaseMonitor.java
new file mode 100644
index 000000000..3706d14c5
--- /dev/null
+++ b/src/main/java/com/oltpbenchmark/api/collectors/monitoring/DatabaseMonitor.java
@@ -0,0 +1,301 @@
+package com.oltpbenchmark.api.collectors.monitoring;
+
+import com.oltpbenchmark.BenchmarkState;
+import com.oltpbenchmark.WorkloadConfiguration;
+import com.oltpbenchmark.api.BenchmarkModule;
+import com.oltpbenchmark.api.Worker;
+import com.oltpbenchmark.util.FileUtil;
+import com.oltpbenchmark.util.MonitorInfo;
+import com.oltpbenchmark.util.StringUtil;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.immutables.value.Value;
+
+/** Generic database monitor that consolidates functionality used across DBMS. */
+public abstract class DatabaseMonitor extends Monitor {
+ protected enum DatabaseState {
+ READY,
+ INVALID,
+ TEST
+ };
+
+ protected final String OUTPUT_DIR = "results/monitor";
+ protected final String CSV_DELIMITER = ",";
+ protected final String SINGLE_QUERY_EVENT_CSV = "single_query_event";
+ protected final String REP_QUERY_EVENT_CSV = "repeated_query_event";
+ protected final String REP_SYSTEM_EVENT_CSV = "system_query_event";
+ protected final int FILE_FLUSH_COUNT = 1000; // flush writes to the metrics files every 1000 ms
+
+ protected DatabaseState currentState = DatabaseState.INVALID;
+ protected int ticks = 1;
+
+ protected WorkloadConfiguration conf;
+ protected Connection conn;
+ protected List singleQueryEvents;
+ protected List repeatedQueryEvents;
+ protected List repeatedSystemEvents;
+
+ /**
+ * Builds the connection to the DBMS using the same connection details as the benchmarking
+ * environment.
+ *
+ * @param conf
+ * @return
+ * @throws SQLException
+ */
+ private final Connection makeConnection() throws SQLException {
+ if (StringUtils.isEmpty(conf.getUsername())) {
+ return DriverManager.getConnection(conf.getUrl());
+ } else {
+ return DriverManager.getConnection(conf.getUrl(), conf.getUsername(), conf.getPassword());
+ }
+ }
+
+ public DatabaseMonitor(
+ MonitorInfo monitorInfo,
+ BenchmarkState testState,
+ List extends Worker extends BenchmarkModule>> workers,
+ WorkloadConfiguration workloadConf) {
+ super(monitorInfo, testState, workers);
+
+ try {
+ this.conf = workloadConf;
+ this.conn = makeConnection();
+ } catch (SQLException e) {
+ this.conn = null;
+ LOG.error("Could not initialize connection to create DatabaseMonitor.");
+ LOG.error(e.getMessage());
+ }
+
+ FileUtil.makeDirIfNotExists(OUTPUT_DIR);
+
+ // Initialize event lists.
+ this.singleQueryEvents = new ArrayList<>();
+ this.repeatedQueryEvents = new ArrayList<>();
+ this.repeatedSystemEvents = new ArrayList<>();
+
+ LOG.info("Initialized DatabaseMonitor.");
+ }
+
+ protected void writeSingleQueryEventsToCSV() {
+ String filePath = getFilePath(SINGLE_QUERY_EVENT_CSV, this.ticks);
+ try {
+ if (this.singleQueryEvents.size() == 0) {
+ LOG.warn("No query events have been recorded, file not written.");
+ return;
+ }
+
+ if (Files.deleteIfExists(Paths.get(filePath))) {
+ LOG.warn("File at " + filePath + " deleted before writing query events to file.");
+ }
+ PrintStream out = new PrintStream(filePath);
+ out.println(
+ "QueryId,"
+ + StringUtil.join(",", this.singleQueryEvents.get(0).getPropertyValues().keySet()));
+ for (SingleQueryEvent event : this.singleQueryEvents) {
+ out.println(
+ event.getQueryId()
+ + ","
+ + StringUtil.join(",", this.singleQueryEvents.get(0).getPropertyValues().values()));
+ }
+ out.close();
+ this.singleQueryEvents = new ArrayList<>();
+ LOG.info("Query events written to " + filePath);
+ } catch (IOException e) {
+ LOG.error("Error when writing query events to file.");
+ LOG.error(e.getMessage());
+ }
+ }
+
+ protected void writeRepeatedQueryEventsToCSV() {
+ String filePath = getFilePath(REP_QUERY_EVENT_CSV, this.ticks);
+ try {
+ if (this.repeatedQueryEvents.size() == 0) {
+ LOG.warn("No repeated query events have been recorded, file not written.");
+ return;
+ }
+
+ if (Files.deleteIfExists(Paths.get(filePath))) {
+ LOG.warn("File at " + filePath + " deleted before writing repeated query events to file.");
+ }
+ PrintStream out = new PrintStream(filePath);
+ out.println(
+ "QueryId,Instant,"
+ + StringUtil.join(",", this.repeatedQueryEvents.get(0).getPropertyValues().keySet()));
+ for (RepeatedQueryEvent event : this.repeatedQueryEvents) {
+ out.println(
+ event.getQueryId()
+ + ","
+ + event.getInstant().toString()
+ + ","
+ + StringUtil.join(
+ ",", this.repeatedQueryEvents.get(0).getPropertyValues().values()));
+ }
+ out.close();
+ this.repeatedQueryEvents = new ArrayList<>();
+ LOG.info("Repeated query events written to " + filePath);
+ } catch (IOException e) {
+ LOG.error("Error when writing repeated query events to file.");
+ LOG.error(e.getMessage());
+ }
+ }
+
+ protected void writeRepeatedSystemEventsToCSV() {
+ String filePath = getFilePath(REP_SYSTEM_EVENT_CSV, this.ticks);
+ try {
+ if (this.repeatedSystemEvents.size() == 0) {
+ LOG.warn("No repeated system events have been recorded, file not written.");
+ return;
+ }
+
+ if (Files.deleteIfExists(Paths.get(filePath))) {
+ LOG.warn("File at " + filePath + " deleted before writing repeated system events to file.");
+ }
+ PrintStream out = new PrintStream(filePath);
+ out.println(
+ "Instant,"
+ + StringUtil.join(
+ ",", this.repeatedSystemEvents.get(0).getPropertyValues().keySet()));
+ for (RepeatedSystemEvent event : this.repeatedSystemEvents) {
+ out.println(
+ event.getInstant().toString()
+ + ","
+ + StringUtil.join(
+ ",", this.repeatedSystemEvents.get(0).getPropertyValues().values()));
+ }
+ out.close();
+ this.repeatedSystemEvents = new ArrayList<>();
+ LOG.info("Repeated system events written to " + filePath);
+ } catch (IOException e) {
+ LOG.error("Error when writing repeated system events to file.");
+ LOG.error(e.getMessage());
+ }
+ }
+
+ protected String getFilePath(String filename, int fileCounter) {
+ return FileUtil.joinPath(OUTPUT_DIR, filename + "_" + fileCounter + ".csv");
+ }
+
+ protected void cleanupCache() {
+ try (PreparedStatement stmt = conn.prepareStatement(this.getCleanupStmt())) {
+ stmt.execute();
+ } catch (SQLException sqlError) {
+ LOG.error("Error when cleaning up cached plans.");
+ LOG.error(sqlError.getMessage());
+ }
+ }
+
+ protected void writeQueryMetrics() {
+ this.writeSingleQueryEventsToCSV();
+ this.writeRepeatedQueryEventsToCSV();
+ }
+
+ protected void writeToCSV() {
+ this.writeQueryMetrics();
+ this.writeSystemMetrics();
+ }
+
+ @Value.Immutable
+ public interface SingleQueryEvent {
+
+ /** A string that identifies the query. */
+ String getQueryId();
+
+ /** Mapping of observed properties to their corresponding values. */
+ Map getPropertyValues();
+ }
+
+ @Value.Immutable
+ public interface RepeatedQueryEvent {
+
+ /** A string that identifies the query. */
+ String getQueryId();
+
+ /** The timestamp at which this event was observed. */
+ Instant getInstant();
+
+ /** Mapping of observed properties to their corresponding values. */
+ Map getPropertyValues();
+ }
+
+ @Value.Immutable
+ public interface RepeatedSystemEvent {
+
+ /** The timestamp at which this event was observed. */
+ Instant getInstant();
+
+ /** Mapping of observed properties to their corresponding values. */
+ Map getPropertyValues();
+ }
+
+ protected abstract String getCleanupStmt();
+
+ /** Execute the extraction of desired query and performance metrics. */
+ protected abstract void runExtraction();
+
+ protected abstract void writeSystemMetrics();
+
+ /**
+ * Run monitor. Clean up cache first and do initial extraction, then sleep as defined by the
+ * interval. Per periodic waking phase, extract metrics and potentially write to file (currently
+ * every ~10mins by default). After execution has finished, consolidate logs and clean up cache
+ * again.
+ */
+ @Override
+ public void run() {
+ int interval = this.monitorInfo.getMonitoringInterval();
+
+ LOG.info("Starting Monitor Interval [{}ms]", interval);
+ // Make sure we record one event during setup.
+ if (this.conn != null) {
+ cleanupCache();
+ runExtraction();
+ }
+ // Periodically extract sys table stats.
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ Thread.sleep(interval);
+ } catch (InterruptedException ex) {
+ // Restore interrupt flag.
+ Thread.currentThread().interrupt();
+ }
+ if (this.conn != null) {
+ runExtraction();
+ }
+ if (ticks % FILE_FLUSH_COUNT == 0) {
+ writeToCSV();
+ }
+ ticks++;
+ }
+
+ if (this.conn != null) {
+ cleanupCache();
+ }
+
+ writeToCSV();
+ }
+
+ /** Called at the end of the test to do any clean up that may be required. */
+ @Override
+ public void tearDown() {
+ if (this.conn != null) {
+ try {
+ conn.close();
+ } catch (SQLException e) {
+ LOG.error("Connection could not be closed.", e);
+ }
+ this.conn = null;
+ }
+ }
+}
diff --git a/src/main/java/com/oltpbenchmark/api/collectors/monitoring/Monitor.java b/src/main/java/com/oltpbenchmark/api/collectors/monitoring/Monitor.java
new file mode 100644
index 000000000..e014f17bd
--- /dev/null
+++ b/src/main/java/com/oltpbenchmark/api/collectors/monitoring/Monitor.java
@@ -0,0 +1,68 @@
+package com.oltpbenchmark.api.collectors.monitoring;
+
+import com.oltpbenchmark.BenchmarkState;
+import com.oltpbenchmark.api.BenchmarkModule;
+import com.oltpbenchmark.api.Worker;
+import com.oltpbenchmark.util.MonitorInfo;
+import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generic monitoring class that reports the throughput of the executing workers while the benchmark
+ * is being executed.
+ */
+public class Monitor extends Thread {
+ protected static final Logger LOG = LoggerFactory.getLogger(DatabaseMonitor.class);
+
+ protected final MonitorInfo monitorInfo;
+ protected final BenchmarkState testState;
+ protected final List extends Worker extends BenchmarkModule>> workers;
+
+ {
+ this.setDaemon(true);
+ }
+
+ /**
+ * @param interval How long to wait between polling in milliseconds
+ */
+ Monitor(
+ MonitorInfo monitorInfo,
+ BenchmarkState testState,
+ List extends Worker extends BenchmarkModule>> workers) {
+ this.monitorInfo = monitorInfo;
+ this.testState = testState;
+ this.workers = workers;
+ }
+
+ @Override
+ public void run() {
+ int interval = this.monitorInfo.getMonitoringInterval();
+
+ LOG.info("Starting MonitorThread Interval [{}ms]", interval);
+ while (!Thread.currentThread().isInterrupted()) {
+ // Compute the last throughput
+ long measuredRequests = 0;
+ synchronized (this.testState) {
+ for (Worker> w : this.workers) {
+ measuredRequests += w.getAndResetIntervalRequests();
+ }
+ }
+ double seconds = interval / 1000d;
+ double tps = (double) measuredRequests / seconds;
+ LOG.info("Throughput: {} txn/sec", tps);
+
+ try {
+ Thread.sleep(interval);
+ } catch (InterruptedException ex) {
+ // Restore interrupt flag.
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /** Called at the end of the test to do any clean up that may be required. */
+ public void tearDown() {
+ // nothing to do here
+ }
+}
diff --git a/src/main/java/com/oltpbenchmark/api/collectors/monitoring/MonitorGen.java b/src/main/java/com/oltpbenchmark/api/collectors/monitoring/MonitorGen.java
new file mode 100644
index 000000000..7e002634b
--- /dev/null
+++ b/src/main/java/com/oltpbenchmark/api/collectors/monitoring/MonitorGen.java
@@ -0,0 +1,36 @@
+package com.oltpbenchmark.api.collectors.monitoring;
+
+import com.oltpbenchmark.BenchmarkState;
+import com.oltpbenchmark.WorkloadConfiguration;
+import com.oltpbenchmark.api.BenchmarkModule;
+import com.oltpbenchmark.api.Worker;
+import com.oltpbenchmark.util.MonitorInfo;
+import java.util.List;
+
+/**
+ * Monitor generator that picks the appropriate monitoring implemnetation based on the database
+ * type.
+ */
+public class MonitorGen {
+ public static Monitor getMonitor(
+ MonitorInfo monitorInfo,
+ BenchmarkState testState,
+ List extends Worker extends BenchmarkModule>> workers,
+ WorkloadConfiguration conf) {
+ switch (monitorInfo.getMonitoringType()) {
+ case ADVANCED:
+ {
+ switch (conf.getDatabaseType()) {
+ case SQLSERVER:
+ return new SQLServerMonitor(monitorInfo, testState, workers, conf);
+ case POSTGRES:
+ return new PostgreSQLMonitor(monitorInfo, testState, workers, conf);
+ default:
+ return new Monitor(monitorInfo, testState, workers);
+ }
+ }
+ default:
+ return new Monitor(monitorInfo, testState, workers);
+ }
+ }
+}
diff --git a/src/main/java/com/oltpbenchmark/api/collectors/monitoring/PostgreSQLMonitor.java b/src/main/java/com/oltpbenchmark/api/collectors/monitoring/PostgreSQLMonitor.java
new file mode 100644
index 000000000..6248450bd
--- /dev/null
+++ b/src/main/java/com/oltpbenchmark/api/collectors/monitoring/PostgreSQLMonitor.java
@@ -0,0 +1,139 @@
+package com.oltpbenchmark.api.collectors.monitoring;
+
+import com.oltpbenchmark.BenchmarkState;
+import com.oltpbenchmark.WorkloadConfiguration;
+import com.oltpbenchmark.api.BenchmarkModule;
+import com.oltpbenchmark.api.Worker;
+import com.oltpbenchmark.util.MonitorInfo;
+import com.oltpbenchmark.util.MonitoringUtil;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+
+/**
+ * Implementation of a monitor specific to PostgreSQL. Uses the 'pg_stat_statements' add-on to
+ * extract relevant query and system information.
+ */
+public class PostgreSQLMonitor extends DatabaseMonitor {
+
+ private final String PG_STAT_STATEMENTS =
+ """
+ SELECT query AS query_text, calls as execution_count, rows,
+ total_exec_time, min_exec_time, max_exec_time,
+ shared_blks_read, shared_blks_written, local_blks_read,
+ local_blks_written, temp_blks_read, temp_blks_written
+ FROM pg_stat_statements;
+ """;
+ private final String CLEAN_CACHE = "SELECT pg_stat_statements_reset();";
+ private final List repeatedQueryProperties;
+
+ private final Set stored_queries;
+
+ public PostgreSQLMonitor(
+ MonitorInfo monitorInfo,
+ BenchmarkState testState,
+ List extends Worker extends BenchmarkModule>> workers,
+ WorkloadConfiguration conf) {
+ super(monitorInfo, testState, workers, conf);
+
+ this.stored_queries = new HashSet();
+
+ this.repeatedQueryProperties =
+ new ArrayList() {
+ {
+ add("execution_count");
+ add("min_exec_time");
+ add("max_exec_time");
+ add("total_exec_time");
+ add("rows");
+ add("shared_blks_read");
+ add("shared_blks_written");
+ add("local_blks_read");
+ add("local_blks_written");
+ add("temp_blks_read");
+ add("temp_blks_written");
+ }
+ };
+ }
+
+ /**
+ * Extract query events (single and repeated) using the extraction query and properties defined
+ * above.
+ */
+ private void extractQueryMetrics(Instant instant) {
+ ImmutableSingleQueryEvent.Builder singleQueryEventBuilder = ImmutableSingleQueryEvent.builder();
+ ImmutableRepeatedQueryEvent.Builder repeatedQueryEventBuilder =
+ ImmutableRepeatedQueryEvent.builder();
+
+ try (PreparedStatement stmt = conn.prepareStatement(PG_STAT_STATEMENTS)) {
+ ResultSet rs = stmt.executeQuery();
+ while (rs.next()) {
+ // Only store those queries that have monitoring enabled via a
+ // comment in the SQL Server dialect XML.
+ String query_text = rs.getString("query_text");
+ if (!query_text.contains(MonitoringUtil.getMonitoringPrefix())) {
+ continue;
+ }
+ // Get identifier from commment in query text.
+ Matcher m = MonitoringUtil.getMonitoringPattern().matcher(query_text);
+ if (m.find()) {
+ String identifier = m.group("queryId");
+ query_text = m.replaceAll("");
+
+ // Handle one-off query info, may occur when a plan gets
+ // executed for the first time.
+ Map propertyValues;
+ if (!stored_queries.contains(identifier)) {
+ stored_queries.add(identifier);
+
+ singleQueryEventBuilder.queryId(identifier);
+ propertyValues = new HashMap();
+ propertyValues.put("query_text", query_text);
+ singleQueryEventBuilder.propertyValues(propertyValues);
+ this.singleQueryEvents.add(singleQueryEventBuilder.build());
+ }
+
+ // Handle repeated query events.
+ repeatedQueryEventBuilder.queryId(identifier).instant(instant);
+ propertyValues = new HashMap();
+ for (String property : this.repeatedQueryProperties) {
+ String value = rs.getString(property);
+ if (value != null) {
+ propertyValues.put(property, value);
+ }
+ }
+ repeatedQueryEventBuilder.propertyValues(propertyValues);
+ this.repeatedQueryEvents.add(repeatedQueryEventBuilder.build());
+ }
+ }
+ } catch (SQLException sqlError) {
+ LOG.error("Error when extracting per query metrics.");
+ LOG.error(sqlError.getMessage());
+ }
+ }
+
+ @Override
+ protected String getCleanupStmt() {
+ return CLEAN_CACHE;
+ }
+
+ @Override
+ protected void runExtraction() {
+ Instant time = Instant.now();
+
+ extractQueryMetrics(time);
+ }
+
+ @Override
+ protected void writeSystemMetrics() {
+ return;
+ }
+}
diff --git a/src/main/java/com/oltpbenchmark/api/collectors/monitoring/README.md b/src/main/java/com/oltpbenchmark/api/collectors/monitoring/README.md
new file mode 100644
index 000000000..5ffb93af0
--- /dev/null
+++ b/src/main/java/com/oltpbenchmark/api/collectors/monitoring/README.md
@@ -0,0 +1,25 @@
+# Monitoring in BenchBase
+
+Monitoring in BenchBase can be enabled using the
+```text
+ -im,--interval-monitor Monitoring Interval in milliseconds
+ -mt,--monitor-type Type of Monitoring (throughput/advanced)
+```
+command line option when executing BenchBase, where the monitoring interval describes the sleeping period of the thread between recording monitoring information.
+We currently support two types of monitoring:
+
+1. Basic throughput monitoring to track the progress while executing a benchmark (`-mt=throughput`), and
+2. monitoring of query and system properties via system tables for both SQLServer and Postgres (`-mt=advanced`).
+ Support for other engines can also be added.
+
+The former is the default setting unless the monitoring type is explicitly set to advanced which will trigger system monitoring if the database type is supported.
+
+Throughput monitoring logs updated throughput values directly to the system output, while advanced monitoring creates csv files recording their findings in folder `results/monitor/`.
+Advanced monitoring collects data for a variety of events, such as one-off information about a query (for example query plans, query text, etc.), repeated information about a query (elapsed time per query execution, worker time, execution count etc.), and repeated system information (cache hits, number of transactions etc.).
+Which events are collected depends on the database system and is customized in corresponding drivers.
+The code for the drivers can be found in package [`src.main.java.com.oltpbenchmark.api.collectors.monitoring`](./../monitoring/).
+
+For advanced monitoring to function with SQLServer, the user needs to have access to the system tables, for Postgres, `pg_stat_statements` needs to be enabled.
+Queries will fail gracefully, i.e., without interrupting the benchmark execution but instead logging an error.
+Note that in either database system, frequent (additional) queries against the DBMS may distort the benchmarking results.
+That is, a high additional query load via frequent pulling of data from the DBMS will incur system load and can potentially block the execution of the actual benchmark queries.
\ No newline at end of file
diff --git a/src/main/java/com/oltpbenchmark/api/collectors/monitoring/SQLServerMonitor.java b/src/main/java/com/oltpbenchmark/api/collectors/monitoring/SQLServerMonitor.java
new file mode 100644
index 000000000..d3d9143f1
--- /dev/null
+++ b/src/main/java/com/oltpbenchmark/api/collectors/monitoring/SQLServerMonitor.java
@@ -0,0 +1,241 @@
+package com.oltpbenchmark.api.collectors.monitoring;
+
+import com.oltpbenchmark.BenchmarkState;
+import com.oltpbenchmark.WorkloadConfiguration;
+import com.oltpbenchmark.api.BenchmarkModule;
+import com.oltpbenchmark.api.Worker;
+import com.oltpbenchmark.util.MonitorInfo;
+import com.oltpbenchmark.util.MonitoringUtil;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Implementation of a monitor specific to SQLServer. Uses SQLServer's system tables to extract
+ * relevant query and system information.
+ */
+public class SQLServerMonitor extends DatabaseMonitor {
+
+ private final String DM_EXEC_QUERY_STATS =
+ """
+ SELECT q.text AS query_text, st.plan_handle, pl.query_plan,
+ q.text AS query_text, st.plan_handle, pl.query_plan,
+ st.execution_count, st.min_worker_time, st.max_worker_time,
+ st.total_worker_time, st.min_physical_reads, st.max_physical_reads,
+ st.total_physical_reads, st.min_elapsed_time, st.max_elapsed_time,
+ st.total_elapsed_time, st.total_rows, st.min_rows, st.max_rows,
+ st.min_spills, st.max_spills, st.total_spills,
+ st.min_logical_writes, st.max_logical_writes, st.total_logical_writes,
+ st.min_logical_reads, st.max_logical_reads, st.total_logical_reads,
+ st.min_used_grant_kb, st.max_used_grant_kb, st.total_used_grant_kb,
+ st.min_used_threads, st.max_used_threads, st.total_used_threads
+ FROM sys.dm_exec_query_stats st
+ CROSS APPLY sys.dm_exec_sql_text(st.plan_handle) q
+ CROSS APPLY sys.dm_exec_query_plan(st.plan_handle) pl
+ """;
+ private String DM_OS_PERFORMANCE_STATS =
+ """
+ SELECT cntr_value, counter_name
+ FROM sys.dm_os_performance_counters
+ WHERE instance_name='%s';
+ """;
+ private final String CLEAN_CACHE = "DBCC FREEPROCCACHE;";
+ private final Pattern DATABASE_URL_PATTERN = Pattern.compile("database=(?\\S+);");
+
+ private final List singleQueryProperties;
+ private final List repeatedQueryProperties;
+ private final List repeatedSystemProperties;
+
+ private final Set cached_plans;
+
+ public SQLServerMonitor(
+ MonitorInfo monitorInfo,
+ BenchmarkState testState,
+ List extends Worker extends BenchmarkModule>> workers,
+ WorkloadConfiguration conf) {
+ super(monitorInfo, testState, workers, conf);
+
+ // Extract the database instance from url.
+ Matcher m = DATABASE_URL_PATTERN.matcher(conf.getUrl());
+ if (m.find()) {
+ DM_OS_PERFORMANCE_STATS = DM_OS_PERFORMANCE_STATS.formatted(m.group("instanceName"));
+ }
+
+ this.cached_plans = new HashSet();
+
+ this.singleQueryProperties =
+ new ArrayList() {
+ {
+ add("query_plan");
+ add("plan_handle");
+ }
+ };
+
+ this.repeatedQueryProperties =
+ new ArrayList() {
+ {
+ add("execution_count");
+ add("min_worker_time");
+ add("max_worker_time");
+ add("total_worker_time");
+ add("min_physical_reads");
+ add("max_physical_reads");
+ add("total_physical_reads");
+ add("min_elapsed_time");
+ add("max_elapsed_time");
+ add("total_elapsed_time");
+ add("min_rows");
+ add("max_rows");
+ add("total_rows");
+ add("min_spills");
+ add("max_spills");
+ add("total_spills");
+ add("min_logical_writes");
+ add("max_logical_writes");
+ add("total_logical_writes");
+ add("min_logical_reads");
+ add("max_logical_reads");
+ add("total_logical_reads");
+ add("min_used_grant_kb");
+ add("max_used_grant_kb");
+ add("total_used_grant_kb");
+ add("min_used_threads");
+ add("max_used_threads");
+ add("total_used_threads");
+ add("plan_handle");
+ }
+ };
+
+ this.repeatedSystemProperties =
+ new ArrayList() {
+ {
+ add("Data File(s) Size (KB)");
+ add("Transactions/sec");
+ add("Write Transactions/sec");
+ add("Cache Hit Ratio");
+ add("Cache Entries Count");
+ }
+ };
+ }
+
+ /**
+ * Extract query events (single and repeated) using the extraction query and properties defined
+ * above.
+ */
+ private void extractQueryMetrics(Instant instant) {
+ ImmutableSingleQueryEvent.Builder singleQueryEventBuilder = ImmutableSingleQueryEvent.builder();
+ ImmutableRepeatedQueryEvent.Builder repeatedQueryEventBuilder =
+ ImmutableRepeatedQueryEvent.builder();
+
+ try (PreparedStatement stmt = conn.prepareStatement(DM_EXEC_QUERY_STATS)) {
+ ResultSet rs = stmt.executeQuery();
+ while (rs.next()) {
+ // Only store those queries that have monitoring enabled via a
+ // comment in the SQL Server dialect XML.
+ String query_text = rs.getString("query_text");
+ if (!query_text.contains(MonitoringUtil.getMonitoringPrefix())) {
+ continue;
+ }
+
+ // Get identifier from commment in query text.
+ Matcher m = MonitoringUtil.getMonitoringPattern().matcher(query_text);
+ if (m.find()) {
+ String identifier = m.group("queryId");
+ query_text = m.replaceAll("");
+ // Get plan_handle for plan identification.
+ String plan_handle = rs.getString("plan_handle");
+
+ // Handle one-off query information, may occur when a plan gets
+ // executed for the first time.
+ Map propertyValues;
+ if (!cached_plans.contains(plan_handle)) {
+ cached_plans.add(plan_handle);
+
+ singleQueryEventBuilder.queryId(identifier);
+ propertyValues = new HashMap();
+ propertyValues.put("query_text", query_text);
+ // Add single events.
+ for (String property : this.singleQueryProperties) {
+ String value = rs.getString(property);
+ if (value != null) {
+ propertyValues.put(property, value);
+ }
+ }
+ singleQueryEventBuilder.propertyValues(propertyValues);
+ this.singleQueryEvents.add(singleQueryEventBuilder.build());
+ }
+
+ // Handle repeated query events.
+ repeatedQueryEventBuilder.queryId(identifier).instant(instant);
+ propertyValues = new HashMap();
+ for (String property : this.repeatedQueryProperties) {
+ String value = rs.getString(property);
+ if (value != null) {
+ propertyValues.put(property, value);
+ }
+ }
+ repeatedQueryEventBuilder.propertyValues(propertyValues);
+ this.repeatedQueryEvents.add(repeatedQueryEventBuilder.build());
+ }
+ }
+ } catch (SQLException sqlError) {
+ LOG.error("Error when extracting per query measurements.");
+ LOG.error(sqlError.getMessage());
+ }
+ }
+
+ /**
+ * Extract system events using the extraction query and properties defined above, will fail
+ * gracefully to not interrupt benchmarking.
+ */
+ private void extractPerformanceMetrics(Instant instant) {
+ ImmutableRepeatedSystemEvent.Builder repeatedSystemEventBuilder =
+ ImmutableRepeatedSystemEvent.builder();
+ repeatedSystemEventBuilder.instant(instant);
+
+ // Extract OS performance events.
+ Map propertyValues = new HashMap();
+ try (PreparedStatement stmt = conn.prepareStatement(DM_OS_PERFORMANCE_STATS)) {
+ ResultSet rs = stmt.executeQuery();
+ while (rs.next()) {
+ // Add property values.
+ String counter_name = rs.getString("counter_name").trim();
+ if (this.repeatedSystemProperties.contains(counter_name)) {
+ propertyValues.put(counter_name, rs.getString("cntr_value"));
+ }
+ }
+ } catch (SQLException sqlError) {
+ LOG.error("Error when extracting OS metrics from SQL Server.");
+ LOG.error(sqlError.getMessage());
+ }
+ repeatedSystemEventBuilder.propertyValues(propertyValues);
+ this.repeatedSystemEvents.add(repeatedSystemEventBuilder.build());
+ }
+
+ @Override
+ protected String getCleanupStmt() {
+ return CLEAN_CACHE;
+ }
+
+ @Override
+ protected void runExtraction() {
+ Instant time = Instant.now();
+
+ extractQueryMetrics(time);
+ extractPerformanceMetrics(time);
+ }
+
+ @Override
+ protected void writeSystemMetrics() {
+ this.writeRepeatedSystemEventsToCSV();
+ }
+}
diff --git a/src/main/java/com/oltpbenchmark/types/DatabaseType.java b/src/main/java/com/oltpbenchmark/types/DatabaseType.java
index 80a0a396f..805ad810e 100644
--- a/src/main/java/com/oltpbenchmark/types/DatabaseType.java
+++ b/src/main/java/com/oltpbenchmark/types/DatabaseType.java
@@ -33,7 +33,7 @@ public enum DatabaseType {
DB2(true, false),
H2(true, false),
HSQLDB(false, false),
- POSTGRES(false, false, true),
+ POSTGRES(false, false, true, true),
MARIADB(true, false),
MONETDB(false, false),
MYROCKS(true, false),
@@ -45,19 +45,28 @@ public enum DatabaseType {
SPANNER(false, true),
SQLAZURE(true, true, true),
SQLITE(true, false),
- SQLSERVER(true, true, true),
+ SQLSERVER(true, true, true, true),
TIMESTEN(true, false),
PHOENIX(true, true);
DatabaseType(
- boolean escapeNames, boolean includeColNames, boolean loadNeedsUpdateColumnSequence) {
+ boolean escapeNames,
+ boolean includeColNames,
+ boolean loadNeedsUpdateColumnSequence,
+ boolean needsMonitoringPrefix) {
this.escapeNames = escapeNames;
this.includeColNames = includeColNames;
this.loadNeedsUpdateColumnSequence = loadNeedsUpdateColumnSequence;
+ this.needsMonitoringPrefix = needsMonitoringPrefix;
+ }
+
+ DatabaseType(
+ boolean escapeNames, boolean includeColNames, boolean loadNeedsUpdateColumnSequence) {
+ this(escapeNames, includeColNames, loadNeedsUpdateColumnSequence, false);
}
DatabaseType(boolean escapeNames, boolean includeColNames) {
- this(escapeNames, includeColNames, false);
+ this(escapeNames, includeColNames, false, false);
}
/** If this flag is set to true, then the framework will escape names in the INSERT queries */
@@ -75,6 +84,9 @@ public enum DatabaseType {
*/
private final boolean loadNeedsUpdateColumnSequence;
+ /** If this flag is set to true, the framework will add a monitoring prefix to each query. */
+ private final boolean needsMonitoringPrefix;
+
// ---------------------------------------------------------------
// ACCESSORS
// ----------------------------------------------------------------
@@ -102,6 +114,13 @@ public boolean shouldUpdateColumnSequenceAfterLoad() {
return (this.loadNeedsUpdateColumnSequence);
}
+ /**
+ * @return True if the framework should add a monitoring prefix to each query.
+ */
+ public boolean shouldCreateMonitoringPrefix() {
+ return (this.needsMonitoringPrefix);
+ }
+
// ----------------------------------------------------------------
// STATIC METHODS + MEMBERS
// ----------------------------------------------------------------
diff --git a/src/main/java/com/oltpbenchmark/util/MonitorInfo.java b/src/main/java/com/oltpbenchmark/util/MonitorInfo.java
new file mode 100644
index 000000000..41f8bdb3f
--- /dev/null
+++ b/src/main/java/com/oltpbenchmark/util/MonitorInfo.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2020 by OLTPBenchmark Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package com.oltpbenchmark.util;
+
+import org.immutables.value.Value;
+
+@Value.Immutable
+public interface MonitorInfo {
+
+ public enum MonitoringType {
+ ADVANCED,
+ THROUGHPUT;
+ }
+
+ /** Monitoring interval. */
+ @Value.Default
+ public default int getMonitoringInterval() {
+ return 0;
+ }
+
+ /** Monitoring type. */
+ @Value.Default
+ public default MonitoringType getMonitoringType() {
+ return MonitoringType.THROUGHPUT;
+ }
+}
diff --git a/src/main/java/com/oltpbenchmark/util/MonitoringUtil.java b/src/main/java/com/oltpbenchmark/util/MonitoringUtil.java
new file mode 100644
index 000000000..8339e4da3
--- /dev/null
+++ b/src/main/java/com/oltpbenchmark/util/MonitoringUtil.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2020 by OLTPBenchmark Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package com.oltpbenchmark.util;
+
+import java.util.regex.Pattern;
+
+public abstract class MonitoringUtil {
+ private static final Pattern MONITORING_PATTERN =
+ Pattern.compile("/[*] MONITOR-(?\\S+) [*]/");
+ private static final String MONITORING_MARKER = "/* MONITOR-$queryId */";
+ private static final String MONITORING_PREFIX = "/* MONITOR-";
+ private static final String MONITORING_QUERYID = "$queryId";
+
+ /** Universal monitoring prefix. */
+ public static Pattern getMonitoringPattern() {
+ return MonitoringUtil.MONITORING_PATTERN;
+ }
+
+ /** Get monitoring marker. */
+ public static String getMonitoringMarker() {
+ return MonitoringUtil.MONITORING_MARKER;
+ }
+
+ /** Get monitoring identifier. */
+ public static String getMonitoringPrefix() {
+ return MonitoringUtil.MONITORING_PREFIX;
+ }
+
+ /** Query identifier in monitoring prefix. */
+ public static String getMonitoringQueryId() {
+ return MonitoringUtil.MONITORING_QUERYID;
+ }
+}
diff --git a/src/main/resources/benchmarks/tpch/dialect-sqlserver.xml b/src/main/resources/benchmarks/tpch/dialect-sqlserver.xml
index f788bf803..cd849d91d 100644
--- a/src/main/resources/benchmarks/tpch/dialect-sqlserver.xml
+++ b/src/main/resources/benchmarks/tpch/dialect-sqlserver.xml
@@ -87,4 +87,4 @@
-
\ No newline at end of file
+
diff --git a/src/main/resources/benchmarks/twitter/dialect-sqlserver.xml b/src/main/resources/benchmarks/twitter/dialect-sqlserver.xml
index 966e724f6..282d9e92f 100644
--- a/src/main/resources/benchmarks/twitter/dialect-sqlserver.xml
+++ b/src/main/resources/benchmarks/twitter/dialect-sqlserver.xml
@@ -13,7 +13,7 @@
SELECT * FROM "tweets" WHERE uid IN (??)
-
+
SELECT TOP 20 f2 FROM "followers" WHERE f1 = ?
@@ -28,4 +28,4 @@
-
+