From 0c176e3e87a508951ac744482dff30c069ea028b Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Fri, 29 Jan 2021 18:15:14 +0000 Subject: [PATCH 01/29] logic decoupling in TPCH.scala for easier benchmarking --- .../berkeley/cs/rise/opaque/benchmark/TPCH.scala | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala index e0bb4d4caf..5e61fe1f97 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala @@ -205,12 +205,19 @@ class TPCH(val sqlContext: SQLContext, val size: String) { } } - def query(queryNumber: Int, securityLevel: SecurityLevel, sqlContext: SQLContext, numPartitions: Int) : DataFrame = { + def getQuery(queryNumber: Int, securityLevel: SecurityLevel, sqlContext: SQLContext, numPartitions: Int) : String = { setupViews(securityLevel, numPartitions) val queryLocation = sys.env.getOrElse("OPAQUE_HOME", ".") + "/src/test/resources/tpch/" - val sqlStr = Source.fromFile(queryLocation + s"q$queryNumber.sql").getLines().mkString("\n") + Source.fromFile(queryLocation + s"q$queryNumber.sql").getLines().mkString("\n") + } - sqlContext.sparkSession.sql(sqlStr) + def performQuery(sqlContext: SQLContext, sqlStr: String) : DataFrame = { + sqlContext.sparkSession.sql(sqlStr); + } + + def query(queryNumber: Int, securityLevel: SecurityLevel, sqlContext: SQLContext, numPartitions: Int) : DataFrame = { + val sqlStr = getQuery(queryNumber: Int, securityLevel: SecurityLevel, sqlContext: SQLContext, numPartitions: Int) + performQuery(sqlContext, sqlStr) } } From 6bb5880f46586b4fa96bdfff992bfbcdc31183dd Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Fri, 29 Jan 2021 18:21:24 +0000 Subject: [PATCH 02/29] added TPCHBenchmark.scala --- .../rise/opaque/benchmark/TPCHBenchmark.Scala | 38 +++++++++++++++++++ .../rise/opaque/benchmark/TPCHBenchmark.scala | 38 +++++++++++++++++++ 2 files changed, 76 insertions(+) create mode 100644 src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.Scala create mode 100644 src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.Scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.Scala new file mode 100644 index 0000000000..4438103fb8 --- /dev/null +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.Scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package edu.berkeley.cs.rise.opaque.benchmark + +import edu.berkeley.cs.rise.opaque.Utils +import org.apache.spark.sql.SparkSession + +/** + * To run locally, use + * `$OPAQUE_HOME/build/sbt 'run edu.berkeley.cs.rise.opaque.benchmark.TPCHBenchmark'`. + * + * To run on a cluster, use `$SPARK_HOME/bin/spark-submit` with appropriate arguments. + */ +object TPCHBenchmark { + def main(args: Array[String]): Unit = { + val spark = SparkSession.builder() + .appName("TPCHBenchmark") + .getOrCreate() + Utils.initSQLContext(spark.sqlContext) + + spark.stop() + } +} \ No newline at end of file diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala new file mode 100644 index 0000000000..4438103fb8 --- /dev/null +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package edu.berkeley.cs.rise.opaque.benchmark + +import edu.berkeley.cs.rise.opaque.Utils +import org.apache.spark.sql.SparkSession + +/** + * To run locally, use + * `$OPAQUE_HOME/build/sbt 'run edu.berkeley.cs.rise.opaque.benchmark.TPCHBenchmark'`. + * + * To run on a cluster, use `$SPARK_HOME/bin/spark-submit` with appropriate arguments. + */ +object TPCHBenchmark { + def main(args: Array[String]): Unit = { + val spark = SparkSession.builder() + .appName("TPCHBenchmark") + .getOrCreate() + Utils.initSQLContext(spark.sqlContext) + + spark.stop() + } +} \ No newline at end of file From 4d5808c6681e1f8e3aa379458fbff643fae500e7 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Fri, 29 Jan 2021 18:57:52 +0000 Subject: [PATCH 03/29] Benchmark.scala rewrite --- .../cs/rise/opaque/benchmark/Benchmark.scala | 36 +++++++++++++----- .../rise/opaque/benchmark/TPCHBenchmark.Scala | 38 ------------------- .../rise/opaque/benchmark/TPCHBenchmark.scala | 9 +---- 3 files changed, 28 insertions(+), 55 deletions(-) delete mode 100644 src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.Scala diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala index b46a94d00c..907cd40cd8 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala @@ -29,6 +29,11 @@ import org.apache.spark.sql.SparkSession * To run on a cluster, use `$SPARK_HOME/bin/spark-submit` with appropriate arguments. */ object Benchmark { + + val spark = SparkSession.builder() + .appName("Benchmark") + .getOrCreate() + def dataDir: String = { if (System.getenv("SPARKSGX_DATA_DIR") == null) { throw new Exception("Set SPARKSGX_DATA_DIR") @@ -36,15 +41,7 @@ object Benchmark { System.getenv("SPARKSGX_DATA_DIR") } - def main(args: Array[String]): Unit = { - val spark = SparkSession.builder() - .appName("QEDBenchmark") - .getOrCreate() - Utils.initSQLContext(spark.sqlContext) - - // val numPartitions = - // if (spark.sparkContext.isLocal) 1 else spark.sparkContext.defaultParallelism - + def logisticRegression() = { // Warmup LogisticRegression.train(spark, Encrypted, 1000, 1) LogisticRegression.train(spark, Encrypted, 1000, 1) @@ -52,7 +49,28 @@ object Benchmark { // Run LogisticRegression.train(spark, Insecure, 100000, 1) LogisticRegression.train(spark, Encrypted, 100000, 1) + } + def runAll() = { + logisticRegression() + } + + def main(args: Array[String]): Unit = { + Utils.initSQLContext(spark.sqlContext) + + if (args.size == 1) { + runAll() + } else { + for (arg <- args) { + println(arg) + arg match { + case "logistic_regression" => { + logisticRegression() + } + case _ => null + } + } + } spark.stop() } } diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.Scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.Scala deleted file mode 100644 index 4438103fb8..0000000000 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.Scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package edu.berkeley.cs.rise.opaque.benchmark - -import edu.berkeley.cs.rise.opaque.Utils -import org.apache.spark.sql.SparkSession - -/** - * To run locally, use - * `$OPAQUE_HOME/build/sbt 'run edu.berkeley.cs.rise.opaque.benchmark.TPCHBenchmark'`. - * - * To run on a cluster, use `$SPARK_HOME/bin/spark-submit` with appropriate arguments. - */ -object TPCHBenchmark { - def main(args: Array[String]): Unit = { - val spark = SparkSession.builder() - .appName("TPCHBenchmark") - .getOrCreate() - Utils.initSQLContext(spark.sqlContext) - - spark.stop() - } -} \ No newline at end of file diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala index 4438103fb8..e1fb5b2a58 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala @@ -17,7 +17,6 @@ package edu.berkeley.cs.rise.opaque.benchmark -import edu.berkeley.cs.rise.opaque.Utils import org.apache.spark.sql.SparkSession /** @@ -27,12 +26,6 @@ import org.apache.spark.sql.SparkSession * To run on a cluster, use `$SPARK_HOME/bin/spark-submit` with appropriate arguments. */ object TPCHBenchmark { - def main(args: Array[String]): Unit = { - val spark = SparkSession.builder() - .appName("TPCHBenchmark") - .getOrCreate() - Utils.initSQLContext(spark.sqlContext) - - spark.stop() + def run(spark: SparkSession) = { } } \ No newline at end of file From 18bfe5612c265478107120298a845229da1894ff Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Fri, 29 Jan 2021 19:46:46 +0000 Subject: [PATCH 04/29] done adding all support TPC-H query benchmarks --- .../cs/rise/opaque/benchmark/Benchmark.scala | 7 ++- .../cs/rise/opaque/benchmark/TPCH.scala | 7 ++- .../rise/opaque/benchmark/TPCHBenchmark.scala | 44 +++++++++++++++---- 3 files changed, 44 insertions(+), 14 deletions(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala index 907cd40cd8..644bda594c 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.SparkSession * Convenient runner for benchmarks. * * To run locally, use - * `$OPAQUE_HOME/build/sbt 'run edu.berkeley.cs.rise.opaque.benchmark.Benchmark'`. + * `$OPAQUE_HOME/build/sbt 'run edu.berkeley.cs.rise.opaque.benchmark.Benchmark '`. * * To run on a cluster, use `$SPARK_HOME/bin/spark-submit` with appropriate arguments. */ @@ -53,6 +53,7 @@ object Benchmark { def runAll() = { logisticRegression() + TPCHBenchmark.run(spark.sqlContext) } def main(args: Array[String]): Unit = { @@ -62,11 +63,13 @@ object Benchmark { runAll() } else { for (arg <- args) { - println(arg) arg match { case "logistic_regression" => { logisticRegression() } + case "tpch" => { + TPCHBenchmark.run(spark.sqlContext) + } case _ => null } } diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala index 5e61fe1f97..89fb73f1cf 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala @@ -205,9 +205,7 @@ class TPCH(val sqlContext: SQLContext, val size: String) { } } - def getQuery(queryNumber: Int, securityLevel: SecurityLevel, sqlContext: SQLContext, numPartitions: Int) : String = { - setupViews(securityLevel, numPartitions) - + def getQuery(queryNumber: Int) : String = { val queryLocation = sys.env.getOrElse("OPAQUE_HOME", ".") + "/src/test/resources/tpch/" Source.fromFile(queryLocation + s"q$queryNumber.sql").getLines().mkString("\n") } @@ -217,7 +215,8 @@ class TPCH(val sqlContext: SQLContext, val size: String) { } def query(queryNumber: Int, securityLevel: SecurityLevel, sqlContext: SQLContext, numPartitions: Int) : DataFrame = { - val sqlStr = getQuery(queryNumber: Int, securityLevel: SecurityLevel, sqlContext: SQLContext, numPartitions: Int) + setupViews(securityLevel, numPartitions) + val sqlStr = getQuery(queryNumber) performQuery(sqlContext, sqlStr) } } diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala index e1fb5b2a58..c6ece87fc4 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala @@ -17,15 +17,43 @@ package edu.berkeley.cs.rise.opaque.benchmark -import org.apache.spark.sql.SparkSession +import edu.berkeley.cs.rise.opaque.Utils + +import org.apache.spark.sql.SQLContext -/** - * To run locally, use - * `$OPAQUE_HOME/build/sbt 'run edu.berkeley.cs.rise.opaque.benchmark.TPCHBenchmark'`. - * - * To run on a cluster, use `$SPARK_HOME/bin/spark-submit` with appropriate arguments. - */ object TPCHBenchmark { - def run(spark: SparkSession) = { + def size = "sf_small" + + def query(queryNumber: Int, tpch: TPCH, sqlContext: SQLContext, numPartitions: Int) = { + val sqlStr = tpch.getQuery(queryNumber) + + tpch.setupViews(Insecure, numPartitions) + Utils.timeBenchmark( + "distributed" -> (numPartitions > 1), + "query" -> s"TPC-H $queryNumber", + "system" -> Insecure.name) { + + tpch.performQuery(sqlContext, sqlStr) + } + + tpch.setupViews(Encrypted, numPartitions) + Utils.timeBenchmark( + "distributed" -> (numPartitions > 1), + "query" -> s"TPC-H $queryNumber", + "system" -> Encrypted.name) { + + tpch.performQuery(sqlContext, sqlStr) + } + } + + def run(sqlContext: SQLContext) = { + val tpch = TPCH(sqlContext, size) + + val supportedQueries = Seq(1, 3, 5, 6, 7, 8, 9, 10, 14, 17) + + for (queryNumber <- supportedQueries) { + query(queryNumber, tpch, sqlContext, 1) + query(queryNumber, tpch, sqlContext, 3) + } } } \ No newline at end of file From 470bd71af61e4a8a108518a8619af00801f60537 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Mon, 1 Feb 2021 19:16:11 +0000 Subject: [PATCH 05/29] changed commandline arguments that benchmark takes --- .../cs/rise/opaque/benchmark/Benchmark.scala | 54 ++++++++++++++----- 1 file changed, 40 insertions(+), 14 deletions(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala index 644bda594c..d78af16217 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala @@ -24,15 +24,30 @@ import org.apache.spark.sql.SparkSession * Convenient runner for benchmarks. * * To run locally, use - * `$OPAQUE_HOME/build/sbt 'run edu.berkeley.cs.rise.opaque.benchmark.Benchmark '`. + * `$OPAQUE_HOME/build/sbt 'run edu.berkeley.cs.rise.opaque.benchmark.Benchmark '`. + * Available flags: + * --num-partitions: specify the number of partitions the data should be split into. + * Default: 2 * number of executors + * --size: specify the size of the dataset that should be loaded into Spark. + * Default: sf_small + * --operations: select the different operations that should be benchmarked. + * Default: all + * Available operations: logistic-regression, tpc-h + * Syntax: --operations "logistic-regression,tpc-h" + * Leave --operations flag blank to run all benchmarks * * To run on a cluster, use `$SPARK_HOME/bin/spark-submit` with appropriate arguments. */ object Benchmark { val spark = SparkSession.builder() - .appName("Benchmark") - .getOrCreate() + .appName("Benchmark") + .getOrCreate() + var numPartitions = 2 * spark.sparkContext + .getConf + .get("spark.executor.instances") + .toInt + var size = "sf_small" def dataDir: String = { if (System.getenv("SPARKSGX_DATA_DIR") == null) { @@ -59,21 +74,32 @@ object Benchmark { def main(args: Array[String]): Unit = { Utils.initSQLContext(spark.sqlContext) - if (args.size == 1) { - runAll() - } else { - for (arg <- args) { - arg match { - case "logistic_regression" => { - logisticRegression() - } - case "tpch" => { - TPCHBenchmark.run(spark.sqlContext) + var runAll = true + args.sliding(2, 2).toList.collect { + case Array("--num-partitions", numPartitions: String) => { + this.numPartitions = numPartitions.toInt + } + case Array("--size", size: String) => { + this.size = size + } + case Array("--operations", operations: String) => { + runAll = false + val operationsArr = operations.split(",").map(_.trim) + for (operation <- operationsArr) { + operation match { + case "logistic-regression" => { + logisticRegression() + } + case "tpc-h" => { + TPCHBenchmark.run(spark.sqlContext) + } } - case _ => null } } } + if (runAll) { + this.runAll(); + } spark.stop() } } From 8c68442cfd8587cb5a225a48e53d758ee509d3e9 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Mon, 1 Feb 2021 19:25:48 +0000 Subject: [PATCH 06/29] TPCHBenchmark takes in parameters --- .../berkeley/cs/rise/opaque/benchmark/Benchmark.scala | 4 ++-- .../cs/rise/opaque/benchmark/TPCHBenchmark.scala | 9 +++------ 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala index d78af16217..6518926a1b 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala @@ -68,7 +68,7 @@ object Benchmark { def runAll() = { logisticRegression() - TPCHBenchmark.run(spark.sqlContext) + TPCHBenchmark.run(spark.sqlContext, numPartitions, size) } def main(args: Array[String]): Unit = { @@ -91,7 +91,7 @@ object Benchmark { logisticRegression() } case "tpc-h" => { - TPCHBenchmark.run(spark.sqlContext) + TPCHBenchmark.run(spark.sqlContext, numPartitions, size) } } } diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala index c6ece87fc4..6733ffc296 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala @@ -22,8 +22,6 @@ import edu.berkeley.cs.rise.opaque.Utils import org.apache.spark.sql.SQLContext object TPCHBenchmark { - def size = "sf_small" - def query(queryNumber: Int, tpch: TPCH, sqlContext: SQLContext, numPartitions: Int) = { val sqlStr = tpch.getQuery(queryNumber) @@ -46,14 +44,13 @@ object TPCHBenchmark { } } - def run(sqlContext: SQLContext) = { + def run(sqlContext: SQLContext, numPartitions: Int, size: String) = { val tpch = TPCH(sqlContext, size) val supportedQueries = Seq(1, 3, 5, 6, 7, 8, 9, 10, 14, 17) for (queryNumber <- supportedQueries) { - query(queryNumber, tpch, sqlContext, 1) - query(queryNumber, tpch, sqlContext, 3) + query(queryNumber, tpch, sqlContext, numPartitions) } } -} \ No newline at end of file +} From f8cd8e46b52a61535451fee185cfe011449a26d5 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Mon, 1 Feb 2021 19:52:42 +0000 Subject: [PATCH 07/29] fixed issue with spark conf --- .../edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala index 6518926a1b..f1cf385808 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.SparkSession * `$OPAQUE_HOME/build/sbt 'run edu.berkeley.cs.rise.opaque.benchmark.Benchmark '`. * Available flags: * --num-partitions: specify the number of partitions the data should be split into. - * Default: 2 * number of executors + * Default: 2 * number of executors if exists, 4 otherwise * --size: specify the size of the dataset that should be loaded into Spark. * Default: sf_small * --operations: select the different operations that should be benchmarked. @@ -45,8 +45,7 @@ object Benchmark { .getOrCreate() var numPartitions = 2 * spark.sparkContext .getConf - .get("spark.executor.instances") - .toInt + .getInt("spark.executor.instances", 2) var size = "sf_small" def dataDir: String = { From 085ce7ada6f733d8ccea6a43b2eede1a16bf062e Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Tue, 2 Feb 2021 22:01:19 +0000 Subject: [PATCH 08/29] size error handling, --help flag --- .../cs/rise/opaque/benchmark/Benchmark.scala | 24 +++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala index f1cf385808..2973b00a92 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala @@ -73,13 +73,33 @@ object Benchmark { def main(args: Array[String]): Unit = { Utils.initSQLContext(spark.sqlContext) + if (args.length >= 2 && args(1) == "--help") { + println( +"""Available flags: + --num-partitions: specify the number of partitions the data should be split into. + Default: 2 * number of executors if exists, 4 otherwise + --size: specify the size of the dataset that should be loaded into Spark. + Default: sf_small + --operations: select the different operations that should be benchmarked. + Default: all + Available operations: logistic-regression, tpc-h + Syntax: --operations "logistic-regression,tpc-h" + Leave --operations flag blank to run all benchmarks""" + ) + } + var runAll = true - args.sliding(2, 2).toList.collect { + args.slice(1, args.length).sliding(2, 2).toList.collect { case Array("--num-partitions", numPartitions: String) => { this.numPartitions = numPartitions.toInt } case Array("--size", size: String) => { - this.size = size + val supportedSizes = Set("sf_small") + if (supportedSizes.contains(size)) { + this.size = size + } else { + println("Given size is not supported: available values are " + supportedSizes.toString()) + } } case Array("--operations", operations: String) => { runAll = false From 867aad41b247c6d3e3076ce94711392c16e7f332 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Wed, 3 Feb 2021 01:11:32 +0000 Subject: [PATCH 09/29] add Utils.force, break cluster mode --- .../edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala | 2 +- .../berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala index 2973b00a92..2d9b3a2fbc 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala @@ -66,7 +66,7 @@ object Benchmark { } def runAll() = { - logisticRegression() + // logisticRegression() TPCHBenchmark.run(spark.sqlContext, numPartitions, size) } diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala index 6733ffc296..795c88514b 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala @@ -31,7 +31,9 @@ object TPCHBenchmark { "query" -> s"TPC-H $queryNumber", "system" -> Insecure.name) { - tpch.performQuery(sqlContext, sqlStr) + val df = tpch.performQuery(sqlContext, sqlStr) + Utils.force(df) + df } tpch.setupViews(Encrypted, numPartitions) @@ -40,7 +42,9 @@ object TPCHBenchmark { "query" -> s"TPC-H $queryNumber", "system" -> Encrypted.name) { - tpch.performQuery(sqlContext, sqlStr) + val df = tpch.performQuery(sqlContext, sqlStr) + Utils.force(df) + df } } From e50506686b8c0ee3c33699f0b4400dd7e03fa233 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Wed, 3 Feb 2021 01:44:11 +0000 Subject: [PATCH 10/29] comment out logistic regression benchmark --- .../edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala index 2d9b3a2fbc..37cd5c16f6 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala @@ -56,6 +56,8 @@ object Benchmark { } def logisticRegression() = { + // TODO: this fails when Spark is ran on a cluster + /* // Warmup LogisticRegression.train(spark, Encrypted, 1000, 1) LogisticRegression.train(spark, Encrypted, 1000, 1) @@ -63,10 +65,11 @@ object Benchmark { // Run LogisticRegression.train(spark, Insecure, 100000, 1) LogisticRegression.train(spark, Encrypted, 100000, 1) + */ } def runAll() = { - // logisticRegression() + logisticRegression() TPCHBenchmark.run(spark.sqlContext, numPartitions, size) } From 4417824d3a580867cba48eb54e6c711c993f02e1 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Wed, 3 Feb 2021 19:46:50 +0000 Subject: [PATCH 11/29] ensureCached right before temp view created/replaced --- .../edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala index 89fb73f1cf..6c6e6187f3 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala @@ -180,7 +180,6 @@ object TPCH { val tpch = new TPCH(sqlContext, size) tpch.tableNames = tableNames tpch.nameToDF = generateMap(sqlContext, size) - tpch.ensureCached() tpch } } @@ -190,18 +189,9 @@ class TPCH(val sqlContext: SQLContext, val size: String) { var tableNames : Seq[String] = Seq() var nameToDF : Map[String, DataFrame] = Map() - def ensureCached() = { - for (name <- tableNames) { - nameToDF.get(name).foreach(df => { - Utils.ensureCached(df) - Utils.ensureCached(Encrypted.applyTo(df)) - }) - } - } - def setupViews(securityLevel: SecurityLevel, numPartitions: Int) = { for ((name, df) <- nameToDF) { - securityLevel.applyTo(df.repartition(numPartitions)).createOrReplaceTempView(name) + Utils.ensureCached(securityLevel.applyTo(df.repartition(numPartitions))).createOrReplaceTempView(name) } } From 4d261c6640755a8b2a17d551865dbea733b85c43 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Wed, 3 Feb 2021 21:10:44 +0000 Subject: [PATCH 12/29] upgrade to 3.0.1 --- build.sbt | 2 +- src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/build.sbt b/build.sbt index 95abea0b39..43d6751f41 100644 --- a/build.sbt +++ b/build.sbt @@ -8,7 +8,7 @@ scalaVersion := "2.12.10" spName := "amplab/opaque" -sparkVersion := "3.0.0" +sparkVersion := "3.0.1" sparkComponents ++= Seq("core", "sql", "catalyst") diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala index 46c5325a8b..9ba226f1a8 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala @@ -1164,7 +1164,7 @@ object Utils extends Logging { // To avoid the need for special handling of the grouping columns, we transform the grouping expressions // into AggregateExpressions that collect the first seen value. val aggGroupingExpressions = groupingExpressions.map { - case e: NamedExpression => AggregateExpression(First(e, Literal(false)), Complete, false) + case e: NamedExpression => AggregateExpression(First(e, false), Complete, false) } val aggregateExpressions = aggGroupingExpressions ++ aggExpressions @@ -1199,6 +1199,7 @@ object Utils extends Logging { input: Seq[Attribute], aggSchema: Seq[Attribute], concatSchema: Seq[Attribute]): Int = { + println(e.aggregateFunction) (e.aggregateFunction: @unchecked) match { case avg @ Average(child) => @@ -1293,7 +1294,7 @@ object Utils extends Logging { evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray) ) - case f @ First(child, Literal(false, BooleanType)) => + case f @ First(child, false) => val first = f.aggBufferAttributes(0) val valueSet = f.aggBufferAttributes(1) @@ -1331,7 +1332,7 @@ object Utils extends Logging { builder, evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray)) - case l @ Last(child, Literal(false, BooleanType)) => + case l @ Last(child, false) => val last = l.aggBufferAttributes(0) val valueSet = l.aggBufferAttributes(1) From 679c2d5823b524159f1482ff3a62cffb7d6b1135 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Wed, 3 Feb 2021 21:18:24 +0000 Subject: [PATCH 13/29] upgrade to 3.0.1 --- README.md | 2 +- build.sbt | 2 +- src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 10d1f5094f..0d0d5b9348 100644 --- a/README.md +++ b/README.md @@ -59,7 +59,7 @@ After downloading the Opaque codebase, build and test it as follows. ## Usage -Next, run Apache Spark SQL queries with Opaque as follows, assuming [Spark 3.0](https://www.apache.org/dyn/closer.lua/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz) (`wget http://apache.mirrors.pair.com/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz`) is already installed: +Next, run Apache Spark SQL queries with Opaque as follows, assuming [Spark 3.0.1](https://www.apache.org/dyn/closer.lua/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz) (`wget http://apache.mirrors.pair.com/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz`) is already installed: 1. Package Opaque into a JAR: diff --git a/build.sbt b/build.sbt index 95abea0b39..43d6751f41 100644 --- a/build.sbt +++ b/build.sbt @@ -8,7 +8,7 @@ scalaVersion := "2.12.10" spName := "amplab/opaque" -sparkVersion := "3.0.0" +sparkVersion := "3.0.1" sparkComponents ++= Seq("core", "sql", "catalyst") diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala index 5a85154253..641223a62d 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala @@ -1170,7 +1170,7 @@ object Utils extends Logging { // To avoid the need for special handling of the grouping columns, we transform the grouping expressions // into AggregateExpressions that collect the first seen value. val aggGroupingExpressions = groupingExpressions.map { - case e: NamedExpression => AggregateExpression(First(e, Literal(false)), Complete, false) + case e: NamedExpression => AggregateExpression(First(e, false), Complete, false) } val aggregateExpressions = aggGroupingExpressions ++ aggExpressions @@ -1299,7 +1299,7 @@ object Utils extends Logging { evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray) ) - case f @ First(child, Literal(false, BooleanType)) => + case f @ First(child, false) => val first = f.aggBufferAttributes(0) val valueSet = f.aggBufferAttributes(1) @@ -1337,7 +1337,7 @@ object Utils extends Logging { builder, evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray)) - case l @ Last(child, Literal(false, BooleanType)) => + case l @ Last(child, false) => val last = l.aggBufferAttributes(0) val valueSet = l.aggBufferAttributes(1) From c0ab7cfb3bd4db8772f61e691e1e299e31b76a6c Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Wed, 3 Feb 2021 23:27:27 +0000 Subject: [PATCH 14/29] 10 scale factor --- data/tpch/synth-tpch-data | 2 +- .../berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/data/tpch/synth-tpch-data b/data/tpch/synth-tpch-data index 929a1760a9..6f58b344d4 100755 --- a/data/tpch/synth-tpch-data +++ b/data/tpch/synth-tpch-data @@ -8,7 +8,7 @@ rm -rf tpch-dbgen git clone https://github.com/electrum/tpch-dbgen cd tpch-dbgen make -j$(nproc) -./dbgen -vf -s 0.01 +./dbgen -vf -s 10 mkdir -p $SPARKSGX_DATA_DIR/tpch/sf_small chmod u+r *.tbl cp *.tbl $SPARKSGX_DATA_DIR/tpch/sf_small/ diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala index 795c88514b..0c22ca1221 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala @@ -25,22 +25,22 @@ object TPCHBenchmark { def query(queryNumber: Int, tpch: TPCH, sqlContext: SQLContext, numPartitions: Int) = { val sqlStr = tpch.getQuery(queryNumber) - tpch.setupViews(Insecure, numPartitions) + tpch.setupViews(Encrypted, numPartitions) Utils.timeBenchmark( "distributed" -> (numPartitions > 1), "query" -> s"TPC-H $queryNumber", - "system" -> Insecure.name) { + "system" -> Encrypted.name) { val df = tpch.performQuery(sqlContext, sqlStr) Utils.force(df) df } - tpch.setupViews(Encrypted, numPartitions) + tpch.setupViews(Insecure, numPartitions) Utils.timeBenchmark( "distributed" -> (numPartitions > 1), "query" -> s"TPC-H $queryNumber", - "system" -> Encrypted.name) { + "system" -> Insecure.name) { val df = tpch.performQuery(sqlContext, sqlStr) Utils.force(df) From eabcddad3e378addbc697861c96455a17de8ed52 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Thu, 4 Feb 2021 00:24:05 +0000 Subject: [PATCH 15/29] persistData --- .../cs/rise/opaque/benchmark/TPCH.scala | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala index 6c6e6187f3..0f14ca5c88 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala @@ -17,6 +17,7 @@ package edu.berkeley.cs.rise.opaque.benchmark +import java.io.File import scala.io.Source import org.apache.spark.sql.DataFrame @@ -189,10 +190,22 @@ class TPCH(val sqlContext: SQLContext, val size: String) { var tableNames : Seq[String] = Seq() var nameToDF : Map[String, DataFrame] = Map() - def setupViews(securityLevel: SecurityLevel, numPartitions: Int) = { + def persistData(securityLevel: SecurityLevel, numPartitions: Int): Seq[File] = { + var paths = Seq[File]() for ((name, df) <- nameToDF) { - Utils.ensureCached(securityLevel.applyTo(df.repartition(numPartitions))).createOrReplaceTempView(name) + val partitionedDF = df.repartition(numPartitions) + val path = Utils.createTempDir() + paths = path +: paths + securityLevel match { + case Insecure => { + partitionedDF.write.format("com.databricks.spark.csv").save(path.toString) + } + case Encrypted => { + partitionedDF.write.format("edu.berkeley.cs.rise.opaque.EncryptedSource").save(path.toString) + } + } } + paths } def getQuery(queryNumber: Int) : String = { @@ -205,7 +218,7 @@ class TPCH(val sqlContext: SQLContext, val size: String) { } def query(queryNumber: Int, securityLevel: SecurityLevel, sqlContext: SQLContext, numPartitions: Int) : DataFrame = { - setupViews(securityLevel, numPartitions) + persistData(securityLevel, numPartitions) val sqlStr = getQuery(queryNumber) performQuery(sqlContext, sqlStr) } From 3c2914656be7c26d4d02778763380fc9d903881f Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Thu, 4 Feb 2021 01:21:37 +0000 Subject: [PATCH 16/29] almost done refactor --- .../cs/rise/opaque/benchmark/TPCH.scala | 62 +++++++++++-------- 1 file changed, 37 insertions(+), 25 deletions(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala index 0f14ca5c88..73b04d03f5 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala @@ -163,7 +163,7 @@ object TPCH { .option("delimiter", "|") .load(s"${Benchmark.dataDir}/tpch/$size/customer.tbl") - def generateMap( + def generateDFs( sqlContext: SQLContext, size: String) : Map[String, DataFrame] = { Map("part" -> part(sqlContext, size), @@ -177,35 +177,47 @@ object TPCH { ), } - def apply(sqlContext: SQLContext, size: String) : TPCH = { + + def apply(sqlContext: SQLContext, size: String, numPartitions: Int) : TPCH = { val tpch = new TPCH(sqlContext, size) - tpch.tableNames = tableNames - tpch.nameToDF = generateMap(sqlContext, size) + tpch.generateFiles(tpch.numPartitions) tpch } } class TPCH(val sqlContext: SQLContext, val size: String) { - var tableNames : Seq[String] = Seq() - var nameToDF : Map[String, DataFrame] = Map() - - def persistData(securityLevel: SecurityLevel, numPartitions: Int): Seq[File] = { - var paths = Seq[File]() - for ((name, df) <- nameToDF) { - val partitionedDF = df.repartition(numPartitions) - val path = Utils.createTempDir() - paths = path +: paths - securityLevel match { - case Insecure => { - partitionedDF.write.format("com.databricks.spark.csv").save(path.toString) - } - case Encrypted => { - partitionedDF.write.format("edu.berkeley.cs.rise.opaque.EncryptedSource").save(path.toString) - } + val tableNames = TPCH.tableNames + val nameToDF = TPCH.generateDFs(sqlContext, size) + + var numPartitions: Int = 1 + var nameToPath : Map[String, File] = Map() + var nameToEncryptedPath : Map[String, File] = Map() + + def generateFiles(numPartitions: Int) = { + if (numPartitions != this.numPartitions) { + this.numPartitions = numPartitions + for ((name, df) <- nameToDF) { + nameToPath.get(name).foreach{ path => Utils.deleteRecursively(path) } + + nameToPath = nameToPath + (name -> createPath(df, Insecure, numPartitions)) + nameToEncryptedPath = nameToEncryptedPath + (name -> createPath(df, Encrypted, numPartitions)) + } + } + } + + private def createPath(df: DataFrame, securityLevel: SecurityLevel, numPartitions: Int): File = { + val partitionedDF = df.repartition(numPartitions) + val path = Utils.createTempDir() + securityLevel match { + case Insecure => { + partitionedDF.write.format("com.databricks.spark.csv").save(path.toString) + } + case Encrypted => { + partitionedDF.write.format("edu.berkeley.cs.rise.opaque.EncryptedSource").save(path.toString) } } - paths + path } def getQuery(queryNumber: Int) : String = { @@ -213,13 +225,13 @@ class TPCH(val sqlContext: SQLContext, val size: String) { Source.fromFile(queryLocation + s"q$queryNumber.sql").getLines().mkString("\n") } - def performQuery(sqlContext: SQLContext, sqlStr: String) : DataFrame = { + def performQuery(sqlContext: SQLContext, sqlStr: String, securityLevel: SecurityLevel): DataFrame = { sqlContext.sparkSession.sql(sqlStr); } - def query(queryNumber: Int, securityLevel: SecurityLevel, sqlContext: SQLContext, numPartitions: Int) : DataFrame = { - persistData(securityLevel, numPartitions) + def query(queryNumber: Int, securityLevel: SecurityLevel, sqlContext: SQLContext, numPartitions: Int): DataFrame = { + generateFiles(numPartitions) val sqlStr = getQuery(queryNumber) - performQuery(sqlContext, sqlStr) + performQuery(sqlContext, sqlStr, securityLevel) } } From 7347c3fe6d56d1c217798443d113c01c352b1820 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Thu, 4 Feb 2021 01:29:25 +0000 Subject: [PATCH 17/29] more cleanup --- .../cs/rise/opaque/benchmark/TPCH.scala | 8 ++-- .../berkeley/cs/rise/opaque/TPCHTests.scala | 44 +++++++++---------- 2 files changed, 26 insertions(+), 26 deletions(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala index 73b04d03f5..561f4ff16c 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala @@ -178,7 +178,7 @@ object TPCH { } - def apply(sqlContext: SQLContext, size: String, numPartitions: Int) : TPCH = { + def apply(sqlContext: SQLContext, size: String) : TPCH = { val tpch = new TPCH(sqlContext, size) tpch.generateFiles(tpch.numPartitions) tpch @@ -225,13 +225,13 @@ class TPCH(val sqlContext: SQLContext, val size: String) { Source.fromFile(queryLocation + s"q$queryNumber.sql").getLines().mkString("\n") } - def performQuery(sqlContext: SQLContext, sqlStr: String, securityLevel: SecurityLevel): DataFrame = { + def performQuery(sqlStr: String, securityLevel: SecurityLevel): DataFrame = { sqlContext.sparkSession.sql(sqlStr); } - def query(queryNumber: Int, securityLevel: SecurityLevel, sqlContext: SQLContext, numPartitions: Int): DataFrame = { + def query(queryNumber: Int, securityLevel: SecurityLevel, numPartitions: Int): DataFrame = { generateFiles(numPartitions) val sqlStr = getQuery(queryNumber) - performQuery(sqlContext, sqlStr, securityLevel) + performQuery(sqlStr, securityLevel) } } diff --git a/src/test/scala/edu/berkeley/cs/rise/opaque/TPCHTests.scala b/src/test/scala/edu/berkeley/cs/rise/opaque/TPCHTests.scala index d003c835f3..3413c3c126 100644 --- a/src/test/scala/edu/berkeley/cs/rise/opaque/TPCHTests.scala +++ b/src/test/scala/edu/berkeley/cs/rise/opaque/TPCHTests.scala @@ -29,91 +29,91 @@ trait TPCHTests extends OpaqueTestsBase { self => def tpch = TPCH(spark.sqlContext, size) testAgainstSpark("TPC-H 1") { securityLevel => - tpch.query(1, securityLevel, spark.sqlContext, numPartitions).collect.toSet + tpch.query(1, securityLevel, numPartitions).collect.toSet } testAgainstSpark("TPC-H 2", ignore) { securityLevel => - tpch.query(2, securityLevel, spark.sqlContext, numPartitions).collect.toSet + tpch.query(2, securityLevel, numPartitions).collect.toSet } testAgainstSpark("TPC-H 3") { securityLevel => - tpch.query(3, securityLevel, spark.sqlContext, numPartitions).collect.toSet + tpch.query(3, securityLevel, numPartitions).collect.toSet } testAgainstSpark("TPC-H 4", ignore) { securityLevel => - tpch.query(4, securityLevel, spark.sqlContext, numPartitions).collect.toSet + tpch.query(4, securityLevel, numPartitions).collect.toSet } testAgainstSpark("TPC-H 5") { securityLevel => - tpch.query(5, securityLevel, spark.sqlContext, numPartitions).collect.toSet + tpch.query(5, securityLevel, numPartitions).collect.toSet } testAgainstSpark("TPC-H 6") { securityLevel => - tpch.query(6, securityLevel, spark.sqlContext, numPartitions).collect.toSet + tpch.query(6, securityLevel, numPartitions).collect.toSet } testAgainstSpark("TPC-H 7") { securityLevel => - tpch.query(7, securityLevel, spark.sqlContext, numPartitions).collect.toSet + tpch.query(7, securityLevel, numPartitions).collect.toSet } testAgainstSpark("TPC-H 8") { securityLevel => - tpch.query(8, securityLevel, spark.sqlContext, numPartitions).collect.toSet + tpch.query(8, securityLevel, numPartitions).collect.toSet } testAgainstSpark("TPC-H 9") { securityLevel => - tpch.query(9, securityLevel, spark.sqlContext, numPartitions).collect.toSet + tpch.query(9, securityLevel, numPartitions).collect.toSet } testAgainstSpark("TPC-H 10") { securityLevel => - tpch.query(10, securityLevel, spark.sqlContext, numPartitions).collect.toSet + tpch.query(10, securityLevel, numPartitions).collect.toSet } testAgainstSpark("TPC-H 11", ignore) { securityLevel => - tpch.query(11, securityLevel, spark.sqlContext, numPartitions).collect.toSet + tpch.query(11, securityLevel, numPartitions).collect.toSet } testAgainstSpark("TPC-H 12", ignore) { securityLevel => - tpch.query(12, securityLevel, spark.sqlContext, numPartitions).collect.toSet + tpch.query(12, securityLevel, numPartitions).collect.toSet } testAgainstSpark("TPC-H 13", ignore) { securityLevel => - tpch.query(13, securityLevel, spark.sqlContext, numPartitions).collect.toSet + tpch.query(13, securityLevel, numPartitions).collect.toSet } testAgainstSpark("TPC-H 14") { securityLevel => - tpch.query(14, securityLevel, spark.sqlContext, numPartitions).collect.toSet + tpch.query(14, securityLevel, numPartitions).collect.toSet } testAgainstSpark("TPC-H 15", ignore) { securityLevel => - tpch.query(15, securityLevel, spark.sqlContext, numPartitions).collect.toSet + tpch.query(15, securityLevel, numPartitions).collect.toSet } testAgainstSpark("TPC-H 16", ignore) { securityLevel => - tpch.query(16, securityLevel, spark.sqlContext, numPartitions).collect.toSet + tpch.query(16, securityLevel, numPartitions).collect.toSet } testAgainstSpark("TPC-H 17") { securityLevel => - tpch.query(17, securityLevel, spark.sqlContext, numPartitions).collect.toSet + tpch.query(17, securityLevel, numPartitions).collect.toSet } testAgainstSpark("TPC-H 18", ignore) { securityLevel => - tpch.query(18, securityLevel, spark.sqlContext, numPartitions).collect.toSet + tpch.query(18, securityLevel, numPartitions).collect.toSet } testAgainstSpark("TPC-H 19", ignore) { securityLevel => - tpch.query(19, securityLevel, spark.sqlContext, numPartitions).collect.toSet + tpch.query(19, securityLevel, numPartitions).collect.toSet } testAgainstSpark("TPC-H 20", ignore) { securityLevel => - tpch.query(20, securityLevel, spark.sqlContext, numPartitions).collect.toSet + tpch.query(20, securityLevel, numPartitions).collect.toSet } testAgainstSpark("TPC-H 21", ignore) { securityLevel => - tpch.query(21, securityLevel, spark.sqlContext, numPartitions).collect.toSet + tpch.query(21, securityLevel, numPartitions).collect.toSet } testAgainstSpark("TPC-H 22", ignore) { securityLevel => - tpch.query(22, securityLevel, spark.sqlContext, numPartitions).collect.toSet + tpch.query(22, securityLevel, numPartitions).collect.toSet } } From 3c90688f65cfcd1971e3a45f96ca560893be5070 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Thu, 4 Feb 2021 01:37:27 +0000 Subject: [PATCH 18/29] compiles --- .../cs/rise/opaque/benchmark/TPCH.scala | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala index 561f4ff16c..b1f1994fe8 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala @@ -225,13 +225,26 @@ class TPCH(val sqlContext: SQLContext, val size: String) { Source.fromFile(queryLocation + s"q$queryNumber.sql").getLines().mkString("\n") } + def loadViews(securityLevel: SecurityLevel) = { + val (map, formatStr) = if (securityLevel == Insecure) + (nameToPath, "com.databricks.spark.csv") else + (nameToEncryptedPath, "edu.berkeley.cs.rise.opaque.EncryptedSource") + for ((name, path) <- map) { + val df = sqlContext.sparkSession.read + .format("edu.berkeley.cs.rise.opaque.EncryptedSource") + .load(path.toString) + df.createOrReplaceTempView(name) + } + } + def performQuery(sqlStr: String, securityLevel: SecurityLevel): DataFrame = { - sqlContext.sparkSession.sql(sqlStr); + loadViews(securityLevel) + sqlContext.sparkSession.sql(sqlStr) } def query(queryNumber: Int, securityLevel: SecurityLevel, numPartitions: Int): DataFrame = { - generateFiles(numPartitions) val sqlStr = getQuery(queryNumber) + generateFiles(numPartitions) performQuery(sqlStr, securityLevel) } } From 62fbea22a073e17ec30ea9b14a480c478ebd9253 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Thu, 4 Feb 2021 02:14:45 +0000 Subject: [PATCH 19/29] 9 passes --- .../berkeley/cs/rise/opaque/benchmark/TPCH.scala | 16 +++++++++------- .../cs/rise/opaque/benchmark/TPCHBenchmark.scala | 7 +++---- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala index b1f1994fe8..9f51ac8b7f 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala @@ -190,9 +190,9 @@ class TPCH(val sqlContext: SQLContext, val size: String) { val tableNames = TPCH.tableNames val nameToDF = TPCH.generateDFs(sqlContext, size) - var numPartitions: Int = 1 - var nameToPath : Map[String, File] = Map() - var nameToEncryptedPath : Map[String, File] = Map() + var numPartitions: Int = -1 + var nameToPath = scala.collection.mutable.Map[String, File]() + var nameToEncryptedPath = scala.collection.mutable.Map[String, File]() def generateFiles(numPartitions: Int) = { if (numPartitions != this.numPartitions) { @@ -200,15 +200,16 @@ class TPCH(val sqlContext: SQLContext, val size: String) { for ((name, df) <- nameToDF) { nameToPath.get(name).foreach{ path => Utils.deleteRecursively(path) } - nameToPath = nameToPath + (name -> createPath(df, Insecure, numPartitions)) - nameToEncryptedPath = nameToEncryptedPath + (name -> createPath(df, Encrypted, numPartitions)) + nameToPath += (name -> createPath(df, Insecure, numPartitions)) + nameToEncryptedPath += (name -> createPath(df, Encrypted, numPartitions)) } } } private def createPath(df: DataFrame, securityLevel: SecurityLevel, numPartitions: Int): File = { - val partitionedDF = df.repartition(numPartitions) + val partitionedDF = securityLevel.applyTo(df.repartition(numPartitions)) val path = Utils.createTempDir() + path.delete() securityLevel match { case Insecure => { partitionedDF.write.format("com.databricks.spark.csv").save(path.toString) @@ -231,7 +232,8 @@ class TPCH(val sqlContext: SQLContext, val size: String) { (nameToEncryptedPath, "edu.berkeley.cs.rise.opaque.EncryptedSource") for ((name, path) <- map) { val df = sqlContext.sparkSession.read - .format("edu.berkeley.cs.rise.opaque.EncryptedSource") + .format(formatStr) + .schema(nameToDF.get(name).get.schema) .load(path.toString) df.createOrReplaceTempView(name) } diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala index 795c88514b..8ceb743eed 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala @@ -24,25 +24,24 @@ import org.apache.spark.sql.SQLContext object TPCHBenchmark { def query(queryNumber: Int, tpch: TPCH, sqlContext: SQLContext, numPartitions: Int) = { val sqlStr = tpch.getQuery(queryNumber) + tpch.generateFiles(numPartitions) - tpch.setupViews(Insecure, numPartitions) Utils.timeBenchmark( "distributed" -> (numPartitions > 1), "query" -> s"TPC-H $queryNumber", "system" -> Insecure.name) { - val df = tpch.performQuery(sqlContext, sqlStr) + val df = tpch.performQuery(sqlStr, Insecure) Utils.force(df) df } - tpch.setupViews(Encrypted, numPartitions) Utils.timeBenchmark( "distributed" -> (numPartitions > 1), "query" -> s"TPC-H $queryNumber", "system" -> Encrypted.name) { - val df = tpch.performQuery(sqlContext, sqlStr) + val df = tpch.performQuery(sqlStr, Encrypted) Utils.force(df) df } From 0f43be356336ad798ef73ac7892550122b835f86 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Thu, 4 Feb 2021 02:22:15 +0000 Subject: [PATCH 20/29] cleanup --- .../cs/rise/opaque/benchmark/TPCH.scala | 25 +++++++------------ .../rise/opaque/benchmark/TPCHBenchmark.scala | 2 +- .../berkeley/cs/rise/opaque/TPCHTests.scala | 2 +- 3 files changed, 11 insertions(+), 18 deletions(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala index 9f51ac8b7f..87c1d836b2 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala @@ -176,13 +176,6 @@ object TPCH { "customer" -> customer(sqlContext, size) ), } - - - def apply(sqlContext: SQLContext, size: String) : TPCH = { - val tpch = new TPCH(sqlContext, size) - tpch.generateFiles(tpch.numPartitions) - tpch - } } class TPCH(val sqlContext: SQLContext, val size: String) { @@ -190,9 +183,14 @@ class TPCH(val sqlContext: SQLContext, val size: String) { val tableNames = TPCH.tableNames val nameToDF = TPCH.generateDFs(sqlContext, size) - var numPartitions: Int = -1 - var nameToPath = scala.collection.mutable.Map[String, File]() - var nameToEncryptedPath = scala.collection.mutable.Map[String, File]() + private var numPartitions: Int = -1 + private var nameToPath = Map[String, File]() + private var nameToEncryptedPath = Map[String, File]() + + def getQuery(queryNumber: Int) : String = { + val queryLocation = sys.env.getOrElse("OPAQUE_HOME", ".") + "/src/test/resources/tpch/" + Source.fromFile(queryLocation + s"q$queryNumber.sql").getLines().mkString("\n") + } def generateFiles(numPartitions: Int) = { if (numPartitions != this.numPartitions) { @@ -221,12 +219,7 @@ class TPCH(val sqlContext: SQLContext, val size: String) { path } - def getQuery(queryNumber: Int) : String = { - val queryLocation = sys.env.getOrElse("OPAQUE_HOME", ".") + "/src/test/resources/tpch/" - Source.fromFile(queryLocation + s"q$queryNumber.sql").getLines().mkString("\n") - } - - def loadViews(securityLevel: SecurityLevel) = { + private def loadViews(securityLevel: SecurityLevel) = { val (map, formatStr) = if (securityLevel == Insecure) (nameToPath, "com.databricks.spark.csv") else (nameToEncryptedPath, "edu.berkeley.cs.rise.opaque.EncryptedSource") diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala index 8ceb743eed..12e027de90 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala @@ -48,7 +48,7 @@ object TPCHBenchmark { } def run(sqlContext: SQLContext, numPartitions: Int, size: String) = { - val tpch = TPCH(sqlContext, size) + val tpch = new TPCH(sqlContext, size) val supportedQueries = Seq(1, 3, 5, 6, 7, 8, 9, 10, 14, 17) diff --git a/src/test/scala/edu/berkeley/cs/rise/opaque/TPCHTests.scala b/src/test/scala/edu/berkeley/cs/rise/opaque/TPCHTests.scala index 3413c3c126..45bdf84248 100644 --- a/src/test/scala/edu/berkeley/cs/rise/opaque/TPCHTests.scala +++ b/src/test/scala/edu/berkeley/cs/rise/opaque/TPCHTests.scala @@ -26,7 +26,7 @@ import edu.berkeley.cs.rise.opaque.benchmark.TPCH trait TPCHTests extends OpaqueTestsBase { self => def size = "sf_small" - def tpch = TPCH(spark.sqlContext, size) + def tpch = new TPCH(spark.sqlContext, size) testAgainstSpark("TPC-H 1") { securityLevel => tpch.query(1, securityLevel, numPartitions).collect.toSet From 1a82045f67000e8f2752099c86fb9765f789966c Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Thu, 4 Feb 2021 16:51:59 +0000 Subject: [PATCH 21/29] collect instead of force, sf_none --- .../edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala | 4 ++-- .../berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala | 8 ++------ 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala index 37cd5c16f6..937222f8ba 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala @@ -46,7 +46,7 @@ object Benchmark { var numPartitions = 2 * spark.sparkContext .getConf .getInt("spark.executor.instances", 2) - var size = "sf_small" + var size = "sf_none" def dataDir: String = { if (System.getenv("SPARKSGX_DATA_DIR") == null) { @@ -97,7 +97,7 @@ object Benchmark { this.numPartitions = numPartitions.toInt } case Array("--size", size: String) => { - val supportedSizes = Set("sf_small") + val supportedSizes = Set("sf_small", "sf_none") if (supportedSizes.contains(size)) { this.size = size } else { diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala index 12e027de90..0520636cec 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala @@ -31,9 +31,7 @@ object TPCHBenchmark { "query" -> s"TPC-H $queryNumber", "system" -> Insecure.name) { - val df = tpch.performQuery(sqlStr, Insecure) - Utils.force(df) - df + tpch.performQuery(sqlStr, Insecure).collect } Utils.timeBenchmark( @@ -41,9 +39,7 @@ object TPCHBenchmark { "query" -> s"TPC-H $queryNumber", "system" -> Encrypted.name) { - val df = tpch.performQuery(sqlStr, Encrypted) - Utils.force(df) - df + tpch.performQuery(sqlStr, Encrypted).collect } } From 29603125f63134d8b12bba0a67928a3fe7b46dff Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Fri, 5 Feb 2021 16:57:41 +0000 Subject: [PATCH 22/29] remove sf_none --- .../edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala index 937222f8ba..37cd5c16f6 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala @@ -46,7 +46,7 @@ object Benchmark { var numPartitions = 2 * spark.sparkContext .getConf .getInt("spark.executor.instances", 2) - var size = "sf_none" + var size = "sf_small" def dataDir: String = { if (System.getenv("SPARKSGX_DATA_DIR") == null) { @@ -97,7 +97,7 @@ object Benchmark { this.numPartitions = numPartitions.toInt } case Array("--size", size: String) => { - val supportedSizes = Set("sf_small", "sf_none") + val supportedSizes = Set("sf_small") if (supportedSizes.contains(size)) { this.size = size } else { From 26003b095f8199a2c2f4301bc73533cf40f73da3 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Sat, 6 Feb 2021 01:05:16 +0000 Subject: [PATCH 23/29] defaultParallelism --- .../edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala index 37cd5c16f6..9a0550dccb 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala @@ -43,9 +43,7 @@ object Benchmark { val spark = SparkSession.builder() .appName("Benchmark") .getOrCreate() - var numPartitions = 2 * spark.sparkContext - .getConf - .getInt("spark.executor.instances", 2) + var numPartitions = spark.sparkContext.defaultParallelism var size = "sf_small" def dataDir: String = { From 02ac3188f0938f98fc1cc730c78d03994da0fd09 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Mon, 8 Feb 2021 23:02:47 +0000 Subject: [PATCH 24/29] no removing trailing/leading whitespace --- .../scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala index 87c1d836b2..971d9fcaf8 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala @@ -210,7 +210,10 @@ class TPCH(val sqlContext: SQLContext, val size: String) { path.delete() securityLevel match { case Insecure => { - partitionedDF.write.format("com.databricks.spark.csv").save(path.toString) + partitionedDF.write.format("com.databricks.spark.csv") + .option("ignoreLeadingWhiteSpace", false) + .option("ignoreTrailingWhiteSpace", false) + .save(path.toString) } case Encrypted => { partitionedDF.write.format("edu.berkeley.cs.rise.opaque.EncryptedSource").save(path.toString) From 7f02fa4e92eb1ab86e9d16a8e283ee21fe75bcb4 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Tue, 9 Feb 2021 17:03:40 +0000 Subject: [PATCH 25/29] add sf_med --- .../edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala index 9a0550dccb..b9e20832d8 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala @@ -44,7 +44,7 @@ object Benchmark { .appName("Benchmark") .getOrCreate() var numPartitions = spark.sparkContext.defaultParallelism - var size = "sf_small" + var size = "sf_med" def dataDir: String = { if (System.getenv("SPARKSGX_DATA_DIR") == null) { @@ -95,7 +95,7 @@ object Benchmark { this.numPartitions = numPartitions.toInt } case Array("--size", size: String) => { - val supportedSizes = Set("sf_small") + val supportedSizes = Set("sf_small, sf_med") if (supportedSizes.contains(size)) { this.size = size } else { From 4bfcb9e87ac35a466642fe363ded2f470ff50bc5 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Tue, 9 Feb 2021 21:19:21 +0000 Subject: [PATCH 26/29] hdfs works in local case --- .../scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala | 6 +++--- .../berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala index 971d9fcaf8..9afa181c69 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala @@ -213,10 +213,10 @@ class TPCH(val sqlContext: SQLContext, val size: String) { partitionedDF.write.format("com.databricks.spark.csv") .option("ignoreLeadingWhiteSpace", false) .option("ignoreTrailingWhiteSpace", false) - .save(path.toString) + .save("hdfs://10.0.3.4:8020/" + path.toString) } case Encrypted => { - partitionedDF.write.format("edu.berkeley.cs.rise.opaque.EncryptedSource").save(path.toString) + partitionedDF.write.format("edu.berkeley.cs.rise.opaque.EncryptedSource").save("hdfs://10.0.3.4:8020" + path.toString) } } path @@ -230,7 +230,7 @@ class TPCH(val sqlContext: SQLContext, val size: String) { val df = sqlContext.sparkSession.read .format(formatStr) .schema(nameToDF.get(name).get.schema) - .load(path.toString) + .load("hdfs://10.0.3.4:8020" + path.toString) df.createOrReplaceTempView(name) } } diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala index c84de5414c..0520636cec 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala @@ -29,7 +29,7 @@ object TPCHBenchmark { Utils.timeBenchmark( "distributed" -> (numPartitions > 1), "query" -> s"TPC-H $queryNumber", - "system" -> Encrypted.name) { + "system" -> Insecure.name) { tpch.performQuery(sqlStr, Insecure).collect } @@ -37,7 +37,7 @@ object TPCHBenchmark { Utils.timeBenchmark( "distributed" -> (numPartitions > 1), "query" -> s"TPC-H $queryNumber", - "system" -> Insecure.name) { + "system" -> Encrypted.name) { tpch.performQuery(sqlStr, Encrypted).collect } From fc494971725236ee24dcfef1984fdbbd868a1cc8 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Tue, 9 Feb 2021 22:43:18 +0000 Subject: [PATCH 27/29] cleanup, added new CLI argument --- .../cs/rise/opaque/benchmark/Benchmark.scala | 21 ++++++++++++++++--- .../cs/rise/opaque/benchmark/TPCH.scala | 8 +++---- .../rise/opaque/benchmark/TPCHBenchmark.scala | 4 ++-- 3 files changed, 24 insertions(+), 9 deletions(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala index b9e20832d8..13c4d288a3 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala @@ -34,6 +34,8 @@ import org.apache.spark.sql.SparkSession * Default: all * Available operations: logistic-regression, tpc-h * Syntax: --operations "logistic-regression,tpc-h" + * --run-local: boolean whether to use HDFS or the local filesystem + * Default: HDFS * Leave --operations flag blank to run all benchmarks * * To run on a cluster, use `$SPARK_HOME/bin/spark-submit` with appropriate arguments. @@ -46,6 +48,9 @@ object Benchmark { var numPartitions = spark.sparkContext.defaultParallelism var size = "sf_med" + // Configure your HDFS namenode url here + var fileUrl = "hdfs://10.0.3.4:8020" + def dataDir: String = { if (System.getenv("SPARKSGX_DATA_DIR") == null) { throw new Exception("Set SPARKSGX_DATA_DIR") @@ -68,7 +73,7 @@ object Benchmark { def runAll() = { logisticRegression() - TPCHBenchmark.run(spark.sqlContext, numPartitions, size) + TPCHBenchmark.run(spark.sqlContext, numPartitions, size, fileUrl) } def main(args: Array[String]): Unit = { @@ -85,7 +90,9 @@ object Benchmark { Default: all Available operations: logistic-regression, tpc-h Syntax: --operations "logistic-regression,tpc-h" - Leave --operations flag blank to run all benchmarks""" + Leave --operations flag blank to run all benchmarks + --run-local: boolean whether to use HDFS or the local filesystem + Default: HDFS""" ) } @@ -102,6 +109,14 @@ object Benchmark { println("Given size is not supported: available values are " + supportedSizes.toString()) } } + case Array("--run-local", runLocal: String) => { + runLocal match { + case "true" => { + fileUrl = "file://" + } + case _ => {} + } + } case Array("--operations", operations: String) => { runAll = false val operationsArr = operations.split(",").map(_.trim) @@ -111,7 +126,7 @@ object Benchmark { logisticRegression() } case "tpc-h" => { - TPCHBenchmark.run(spark.sqlContext, numPartitions, size) + TPCHBenchmark.run(spark.sqlContext, numPartitions, size, fileUrl) } } } diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala index 9afa181c69..ee905026c8 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala @@ -178,7 +178,7 @@ object TPCH { } } -class TPCH(val sqlContext: SQLContext, val size: String) { +class TPCH(val sqlContext: SQLContext, val size: String, val fileUrl: String) { val tableNames = TPCH.tableNames val nameToDF = TPCH.generateDFs(sqlContext, size) @@ -213,10 +213,10 @@ class TPCH(val sqlContext: SQLContext, val size: String) { partitionedDF.write.format("com.databricks.spark.csv") .option("ignoreLeadingWhiteSpace", false) .option("ignoreTrailingWhiteSpace", false) - .save("hdfs://10.0.3.4:8020/" + path.toString) + .save(fileUrl + path.toString) } case Encrypted => { - partitionedDF.write.format("edu.berkeley.cs.rise.opaque.EncryptedSource").save("hdfs://10.0.3.4:8020" + path.toString) + partitionedDF.write.format("edu.berkeley.cs.rise.opaque.EncryptedSource").save(fileUrl + path.toString) } } path @@ -230,7 +230,7 @@ class TPCH(val sqlContext: SQLContext, val size: String) { val df = sqlContext.sparkSession.read .format(formatStr) .schema(nameToDF.get(name).get.schema) - .load("hdfs://10.0.3.4:8020" + path.toString) + .load(fileUrl + path.toString) df.createOrReplaceTempView(name) } } diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala index 0520636cec..2cd5a70b86 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala @@ -43,8 +43,8 @@ object TPCHBenchmark { } } - def run(sqlContext: SQLContext, numPartitions: Int, size: String) = { - val tpch = new TPCH(sqlContext, size) + def run(sqlContext: SQLContext, numPartitions: Int, size: String, fileUrl: String) = { + val tpch = new TPCH(sqlContext, size, fileUrl) val supportedQueries = Seq(1, 3, 5, 6, 7, 8, 9, 10, 14, 17) From 156adf98910d961035fcb9fc74ef76d27ec07b88 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Tue, 9 Feb 2021 22:50:07 +0000 Subject: [PATCH 28/29] added newly supported tpch queries --- .../edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala | 2 +- src/test/scala/edu/berkeley/cs/rise/opaque/TPCHTests.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala index 2cd5a70b86..52994914a4 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala @@ -46,7 +46,7 @@ object TPCHBenchmark { def run(sqlContext: SQLContext, numPartitions: Int, size: String, fileUrl: String) = { val tpch = new TPCH(sqlContext, size, fileUrl) - val supportedQueries = Seq(1, 3, 5, 6, 7, 8, 9, 10, 14, 17) + val supportedQueries = Seq(1, 3, 5, 6, 7, 8, 9, 10, 12, 14, 17, 19, 20) for (queryNumber <- supportedQueries) { query(queryNumber, tpch, sqlContext, numPartitions) diff --git a/src/test/scala/edu/berkeley/cs/rise/opaque/TPCHTests.scala b/src/test/scala/edu/berkeley/cs/rise/opaque/TPCHTests.scala index 4983dba621..f532613117 100644 --- a/src/test/scala/edu/berkeley/cs/rise/opaque/TPCHTests.scala +++ b/src/test/scala/edu/berkeley/cs/rise/opaque/TPCHTests.scala @@ -26,7 +26,7 @@ import edu.berkeley.cs.rise.opaque.benchmark.TPCH trait TPCHTests extends OpaqueTestsBase { self => def size = "sf_small" - def tpch = new TPCH(spark.sqlContext, size) + def tpch = new TPCH(spark.sqlContext, size, "file://") testAgainstSpark("TPC-H 1") { securityLevel => tpch.query(1, securityLevel, numPartitions).collect From 6197b4353288766b3af68083acfcebf11b0b988c Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Fri, 19 Feb 2021 01:41:30 +0000 Subject: [PATCH 29/29] function for running all supported tests --- .../rise/opaque/benchmark/TPCHBenchmark.scala | 6 +- .../berkeley/cs/rise/opaque/TPCHTests.scala | 90 +++---------------- 2 files changed, 15 insertions(+), 81 deletions(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala index c84de5414c..d422fe4c7d 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala @@ -22,6 +22,10 @@ import edu.berkeley.cs.rise.opaque.Utils import org.apache.spark.sql.SQLContext object TPCHBenchmark { + + // Add query numbers here once they are supported + val supportedQueries = Seq(1, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 17, 19, 20, 22) + def query(queryNumber: Int, tpch: TPCH, sqlContext: SQLContext, numPartitions: Int) = { val sqlStr = tpch.getQuery(queryNumber) tpch.generateFiles(numPartitions) @@ -46,8 +50,6 @@ object TPCHBenchmark { def run(sqlContext: SQLContext, numPartitions: Int, size: String) = { val tpch = new TPCH(sqlContext, size) - val supportedQueries = Seq(1, 3, 5, 6, 7, 8, 9, 10, 14, 17) - for (queryNumber <- supportedQueries) { query(queryNumber, tpch, sqlContext, numPartitions) } diff --git a/src/test/scala/edu/berkeley/cs/rise/opaque/TPCHTests.scala b/src/test/scala/edu/berkeley/cs/rise/opaque/TPCHTests.scala index ef987c40bc..aa0bb02eac 100644 --- a/src/test/scala/edu/berkeley/cs/rise/opaque/TPCHTests.scala +++ b/src/test/scala/edu/berkeley/cs/rise/opaque/TPCHTests.scala @@ -21,91 +21,19 @@ package edu.berkeley.cs.rise.opaque import org.apache.spark.sql.SparkSession import edu.berkeley.cs.rise.opaque.benchmark._ -import edu.berkeley.cs.rise.opaque.benchmark.TPCH trait TPCHTests extends OpaqueTestsBase { self => def size = "sf_small" def tpch = new TPCH(spark.sqlContext, size) - testAgainstSpark("TPC-H 1") { securityLevel => - tpch.query(1, securityLevel, numPartitions).collect - } - - testAgainstSpark("TPC-H 2", ignore) { securityLevel => - tpch.query(2, securityLevel, numPartitions).collect - } - - testAgainstSpark("TPC-H 3") { securityLevel => - tpch.query(3, securityLevel, numPartitions).collect - } - - testAgainstSpark("TPC-H 4") { securityLevel => - tpch.query(4, securityLevel, numPartitions).collect - } - - testAgainstSpark("TPC-H 5") { securityLevel => - tpch.query(5, securityLevel, numPartitions).collect - } - - testAgainstSpark("TPC-H 6") { securityLevel => - tpch.query(6, securityLevel, numPartitions).collect.toSet - } - - testAgainstSpark("TPC-H 7") { securityLevel => - tpch.query(7, securityLevel, numPartitions).collect - } - - testAgainstSpark("TPC-H 8") { securityLevel => - tpch.query(8, securityLevel, numPartitions).collect - } - - testAgainstSpark("TPC-H 9") { securityLevel => - tpch.query(9, securityLevel, numPartitions).collect - } - - testAgainstSpark("TPC-H 10") { securityLevel => - tpch.query(10, securityLevel, numPartitions).collect - } - - testAgainstSpark("TPC-H 11") { securityLevel => - tpch.query(11, securityLevel, numPartitions).collect - } - - testAgainstSpark("TPC-H 12") { securityLevel => - tpch.query(12, securityLevel, numPartitions).collect - } - - testAgainstSpark("TPC-H 13", ignore) { securityLevel => - tpch.query(13, securityLevel, numPartitions).collect - } - - testAgainstSpark("TPC-H 14") { securityLevel => - tpch.query(14, securityLevel, numPartitions).collect.toSet - } - - testAgainstSpark("TPC-H 15") { securityLevel => - tpch.query(16, securityLevel, numPartitions).collect - } - - testAgainstSpark("TPC-H 18", ignore) { securityLevel => - tpch.query(18, securityLevel, numPartitions).collect - } - - testAgainstSpark("TPC-H 19") { securityLevel => - tpch.query(19, securityLevel, numPartitions).collect.toSet - } - - testAgainstSpark("TPC-H 20") { securityLevel => - tpch.query(20, securityLevel, numPartitions).collect.toSet - } - - testAgainstSpark("TPC-H 21", ignore) { securityLevel => - tpch.query(21, securityLevel, numPartitions).collect - } - - testAgainstSpark("TPC-H 22") { securityLevel => - tpch.query(22, securityLevel, numPartitions).collect + def runTests() = { + for (queryNum <- TPCHBenchmark.supportedQueries) { + val testStr = s"TPC-H $queryNum" + testAgainstSpark(testStr) { securityLevel => + tpch.query(queryNum, securityLevel, numPartitions).collect + } + } } } @@ -116,6 +44,8 @@ class TPCHSinglePartitionSuite extends TPCHTests { .appName("TPCHSinglePartitionSuite") .config("spark.sql.shuffle.partitions", numPartitions) .getOrCreate() + + runTests(); } class TPCHMultiplePartitionSuite extends TPCHTests { @@ -125,4 +55,6 @@ class TPCHMultiplePartitionSuite extends TPCHTests { .appName("TPCHMultiplePartitionSuite") .config("spark.sql.shuffle.partitions", numPartitions) .getOrCreate() + + runTests(); }