Skip to content

Add TPC-H Benchmarks #139

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 36 commits into from
Feb 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
0c176e3
logic decoupling in TPCH.scala for easier benchmarking
octaviansima Jan 29, 2021
6bb5880
added TPCHBenchmark.scala
octaviansima Jan 29, 2021
4d5808c
Benchmark.scala rewrite
octaviansima Jan 29, 2021
18bfe56
done adding all support TPC-H query benchmarks
octaviansima Jan 29, 2021
7e4aa5f
Merge branch 'master' into tpch-benchmark
octaviansima Feb 1, 2021
470bd71
changed commandline arguments that benchmark takes
octaviansima Feb 1, 2021
8c68442
TPCHBenchmark takes in parameters
octaviansima Feb 1, 2021
f8cd8e4
fixed issue with spark conf
octaviansima Feb 1, 2021
085ce7a
size error handling, --help flag
octaviansima Feb 2, 2021
867aad4
add Utils.force, break cluster mode
octaviansima Feb 3, 2021
e505066
comment out logistic regression benchmark
octaviansima Feb 3, 2021
4417824
ensureCached right before temp view created/replaced
octaviansima Feb 3, 2021
4d261c6
upgrade to 3.0.1
octaviansima Feb 3, 2021
679c2d5
upgrade to 3.0.1
octaviansima Feb 3, 2021
39fbf13
Merge branch 'spark-3.0.1' into tpch-benchmark
octaviansima Feb 3, 2021
b7f8251
Merge branch 'tpch-benchmark' of github.com:octaviansima/opaque into …
octaviansima Feb 3, 2021
c0ab7cf
10 scale factor
octaviansima Feb 3, 2021
eabcdda
persistData
octaviansima Feb 4, 2021
3c29146
almost done refactor
octaviansima Feb 4, 2021
7347c3f
more cleanup
octaviansima Feb 4, 2021
3c90688
compiles
octaviansima Feb 4, 2021
62fbea2
9 passes
octaviansima Feb 4, 2021
0f43be3
cleanup
octaviansima Feb 4, 2021
1a82045
collect instead of force, sf_none
octaviansima Feb 4, 2021
2960312
remove sf_none
octaviansima Feb 5, 2021
26003b0
defaultParallelism
octaviansima Feb 6, 2021
17f82fa
Merge branch 'master' into tpch-benchmark
octaviansima Feb 8, 2021
6201a71
Merge branch 'tpch-benchmark' of github.com:octaviansima/opaque into …
octaviansima Feb 8, 2021
02ac318
no removing trailing/leading whitespace
octaviansima Feb 8, 2021
7f02fa4
add sf_med
octaviansima Feb 9, 2021
4bfcb9e
hdfs works in local case
octaviansima Feb 9, 2021
fc49497
cleanup, added new CLI argument
octaviansima Feb 9, 2021
156adf9
added newly supported tpch queries
octaviansima Feb 9, 2021
577d2cc
Merge branch 'master' into tpch-benchmark
octaviansima Feb 19, 2021
6197b43
function for running all supported tests
octaviansima Feb 19, 2021
0dbf790
Merge branch 'tpch-benchmark' of github.com:octaviansima/opaque into …
octaviansima Feb 19, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 92 additions & 10 deletions src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,35 +24,117 @@ import org.apache.spark.sql.SparkSession
* Convenient runner for benchmarks.
*
* To run locally, use
* `$OPAQUE_HOME/build/sbt 'run edu.berkeley.cs.rise.opaque.benchmark.Benchmark'`.
* `$OPAQUE_HOME/build/sbt 'run edu.berkeley.cs.rise.opaque.benchmark.Benchmark <flags>'`.
* Available flags:
* --num-partitions: specify the number of partitions the data should be split into.
* Default: 2 * number of executors if exists, 4 otherwise
* --size: specify the size of the dataset that should be loaded into Spark.
* Default: sf_small
* --operations: select the different operations that should be benchmarked.
* Default: all
* Available operations: logistic-regression, tpc-h
* Syntax: --operations "logistic-regression,tpc-h"
* --run-local: boolean whether to use HDFS or the local filesystem
* Default: HDFS
* Leave --operations flag blank to run all benchmarks
*
* To run on a cluster, use `$SPARK_HOME/bin/spark-submit` with appropriate arguments.
*/
object Benchmark {

val spark = SparkSession.builder()
.appName("Benchmark")
.getOrCreate()
var numPartitions = spark.sparkContext.defaultParallelism
var size = "sf_med"

// Configure your HDFS namenode url here
var fileUrl = "hdfs://10.0.3.4:8020"

def dataDir: String = {
if (System.getenv("SPARKSGX_DATA_DIR") == null) {
throw new Exception("Set SPARKSGX_DATA_DIR")
}
System.getenv("SPARKSGX_DATA_DIR")
}

def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("QEDBenchmark")
.getOrCreate()
Utils.initSQLContext(spark.sqlContext)

// val numPartitions =
// if (spark.sparkContext.isLocal) 1 else spark.sparkContext.defaultParallelism

def logisticRegression() = {
// TODO: this fails when Spark is ran on a cluster
/*
// Warmup
LogisticRegression.train(spark, Encrypted, 1000, 1)
LogisticRegression.train(spark, Encrypted, 1000, 1)

// Run
LogisticRegression.train(spark, Insecure, 100000, 1)
LogisticRegression.train(spark, Encrypted, 100000, 1)
*/
}

def runAll() = {
logisticRegression()
TPCHBenchmark.run(spark.sqlContext, numPartitions, size, fileUrl)
}

def main(args: Array[String]): Unit = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should have a help argument here? Also, once this benchmark is set up, it might be good to put some text in the README about this benchmark.

Utils.initSQLContext(spark.sqlContext)

if (args.length >= 2 && args(1) == "--help") {
println(
"""Available flags:
--num-partitions: specify the number of partitions the data should be split into.
Default: 2 * number of executors if exists, 4 otherwise
--size: specify the size of the dataset that should be loaded into Spark.
Default: sf_small
--operations: select the different operations that should be benchmarked.
Default: all
Available operations: logistic-regression, tpc-h
Syntax: --operations "logistic-regression,tpc-h"
Leave --operations flag blank to run all benchmarks
--run-local: boolean whether to use HDFS or the local filesystem
Default: HDFS"""
)
}

var runAll = true
args.slice(1, args.length).sliding(2, 2).toList.collect {
case Array("--num-partitions", numPartitions: String) => {
this.numPartitions = numPartitions.toInt
}
case Array("--size", size: String) => {
val supportedSizes = Set("sf_small, sf_med")
if (supportedSizes.contains(size)) {
this.size = size
} else {
println("Given size is not supported: available values are " + supportedSizes.toString())
}
}
case Array("--run-local", runLocal: String) => {
runLocal match {
case "true" => {
fileUrl = "file://"
}
case _ => {}
}
}
case Array("--operations", operations: String) => {
runAll = false
val operationsArr = operations.split(",").map(_.trim)
for (operation <- operationsArr) {
operation match {
case "logistic-regression" => {
logisticRegression()
}
Comment on lines +125 to +127
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you make sure that this actually runs? I remember there were some errors that cause this to fail a while ago...

Copy link
Collaborator Author

@octaviansima octaviansima Feb 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It runs locally in Spark but not in a cluster, going to comment it out and add a TODO for now.

case "tpc-h" => {
TPCHBenchmark.run(spark.sqlContext, numPartitions, size, fileUrl)
}
}
}
}
}
if (runAll) {
this.runAll();
}
spark.stop()
}
}
84 changes: 58 additions & 26 deletions src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package edu.berkeley.cs.rise.opaque.benchmark

import java.io.File
import scala.io.Source

import org.apache.spark.sql.DataFrame
Expand Down Expand Up @@ -162,7 +163,7 @@ object TPCH {
.option("delimiter", "|")
.load(s"${Benchmark.dataDir}/tpch/$size/customer.tbl")

def generateMap(
def generateDFs(
sqlContext: SQLContext, size: String)
: Map[String, DataFrame] = {
Map("part" -> part(sqlContext, size),
Expand All @@ -175,42 +176,73 @@ object TPCH {
"customer" -> customer(sqlContext, size)
),
}

def apply(sqlContext: SQLContext, size: String) : TPCH = {
val tpch = new TPCH(sqlContext, size)
tpch.tableNames = tableNames
tpch.nameToDF = generateMap(sqlContext, size)
tpch.ensureCached()
tpch
}
}

class TPCH(val sqlContext: SQLContext, val size: String) {
class TPCH(val sqlContext: SQLContext, val size: String, val fileUrl: String) {

var tableNames : Seq[String] = Seq()
var nameToDF : Map[String, DataFrame] = Map()
val tableNames = TPCH.tableNames
val nameToDF = TPCH.generateDFs(sqlContext, size)

def ensureCached() = {
for (name <- tableNames) {
nameToDF.get(name).foreach(df => {
Utils.ensureCached(df)
Utils.ensureCached(Encrypted.applyTo(df))
})
}
private var numPartitions: Int = -1
private var nameToPath = Map[String, File]()
private var nameToEncryptedPath = Map[String, File]()

def getQuery(queryNumber: Int) : String = {
val queryLocation = sys.env.getOrElse("OPAQUE_HOME", ".") + "/src/test/resources/tpch/"
Source.fromFile(queryLocation + s"q$queryNumber.sql").getLines().mkString("\n")
}

def setupViews(securityLevel: SecurityLevel, numPartitions: Int) = {
for ((name, df) <- nameToDF) {
securityLevel.applyTo(df.repartition(numPartitions)).createOrReplaceTempView(name)
def generateFiles(numPartitions: Int) = {
if (numPartitions != this.numPartitions) {
this.numPartitions = numPartitions
for ((name, df) <- nameToDF) {
nameToPath.get(name).foreach{ path => Utils.deleteRecursively(path) }

nameToPath += (name -> createPath(df, Insecure, numPartitions))
nameToEncryptedPath += (name -> createPath(df, Encrypted, numPartitions))
}
}
}

def query(queryNumber: Int, securityLevel: SecurityLevel, sqlContext: SQLContext, numPartitions: Int) : DataFrame = {
setupViews(securityLevel, numPartitions)
private def createPath(df: DataFrame, securityLevel: SecurityLevel, numPartitions: Int): File = {
val partitionedDF = securityLevel.applyTo(df.repartition(numPartitions))
val path = Utils.createTempDir()
path.delete()
securityLevel match {
case Insecure => {
partitionedDF.write.format("com.databricks.spark.csv")
.option("ignoreLeadingWhiteSpace", false)
.option("ignoreTrailingWhiteSpace", false)
.save(fileUrl + path.toString)
}
case Encrypted => {
partitionedDF.write.format("edu.berkeley.cs.rise.opaque.EncryptedSource").save(fileUrl + path.toString)
}
}
path
}

val queryLocation = sys.env.getOrElse("OPAQUE_HOME", ".") + "/src/test/resources/tpch/"
val sqlStr = Source.fromFile(queryLocation + s"q$queryNumber.sql").getLines().mkString("\n")
private def loadViews(securityLevel: SecurityLevel) = {
val (map, formatStr) = if (securityLevel == Insecure)
(nameToPath, "com.databricks.spark.csv") else
(nameToEncryptedPath, "edu.berkeley.cs.rise.opaque.EncryptedSource")
for ((name, path) <- map) {
val df = sqlContext.sparkSession.read
.format(formatStr)
.schema(nameToDF.get(name).get.schema)
.load(fileUrl + path.toString)
df.createOrReplaceTempView(name)
}
}

def performQuery(sqlStr: String, securityLevel: SecurityLevel): DataFrame = {
loadViews(securityLevel)
sqlContext.sparkSession.sql(sqlStr)
}

def query(queryNumber: Int, securityLevel: SecurityLevel, numPartitions: Int): DataFrame = {
val sqlStr = getQuery(queryNumber)
generateFiles(numPartitions)
performQuery(sqlStr, securityLevel)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package edu.berkeley.cs.rise.opaque.benchmark

import edu.berkeley.cs.rise.opaque.Utils

import org.apache.spark.sql.SQLContext

object TPCHBenchmark {

// Add query numbers here once they are supported
val supportedQueries = Seq(1, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 17, 19, 20, 22)

def query(queryNumber: Int, tpch: TPCH, sqlContext: SQLContext, numPartitions: Int) = {
val sqlStr = tpch.getQuery(queryNumber)
tpch.generateFiles(numPartitions)

Utils.timeBenchmark(
"distributed" -> (numPartitions > 1),
"query" -> s"TPC-H $queryNumber",
"system" -> Insecure.name) {

tpch.performQuery(sqlStr, Insecure).collect
}

Utils.timeBenchmark(
"distributed" -> (numPartitions > 1),
"query" -> s"TPC-H $queryNumber",
"system" -> Encrypted.name) {

tpch.performQuery(sqlStr, Encrypted).collect
}
}

def run(sqlContext: SQLContext, numPartitions: Int, size: String, fileUrl: String) = {
val tpch = new TPCH(sqlContext, size, fileUrl)

for (queryNumber <- supportedQueries) {
query(queryNumber, tpch, sqlContext, numPartitions)
}
}
}
Loading