Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
3ef47a1
added tpch sql files
octaviansima Jan 26, 2021
b05ac81
functions updated to save temp view
octaviansima Jan 26, 2021
36e97c1
main function skeleton done
octaviansima Jan 26, 2021
a7780df
load and clear done
octaviansima Jan 26, 2021
64ec399
fix clear
octaviansima Jan 26, 2021
d7b99e2
performQuery done
octaviansima Jan 26, 2021
4b4859c
Merge branch 'master' into tpch
octaviansima Jan 26, 2021
91644c2
import cleanup, use OPAQUE_HOME
octaviansima Jan 26, 2021
6406e4a
TPC-H 9 refactored to use SQL rather than DF operations
octaviansima Jan 26, 2021
1c9807f
removed : Unit, unused imports
octaviansima Jan 26, 2021
a07752a
added TestUtils.scala
octaviansima Jan 27, 2021
cc6e919
moved all common initialization to TestUtils
octaviansima Jan 27, 2021
de67ac0
update name
octaviansima Jan 27, 2021
f8c740b
begin rewriting TPCH.scala to store persistent tables
octaviansima Jan 27, 2021
99040eb
invalid table name error
octaviansima Jan 27, 2021
f388bb7
TPCH conversion to class started
octaviansima Jan 27, 2021
c515761
compiles
octaviansima Jan 27, 2021
d5ce84b
added second case, cleared up names
octaviansima Jan 27, 2021
53d2836
added TPC-H 6 to check that persistent state has no issues
octaviansima Jan 27, 2021
96afc47
added functions for the last two tables
octaviansima Jan 27, 2021
d5a3268
addressed most logic changes
octaviansima Jan 27, 2021
e3ce7b3
DataFrame only loaded once
octaviansima Jan 28, 2021
f7e47da
apply method in companion object
octaviansima Jan 28, 2021
85035c2
full test suite added
octaviansima Jan 28, 2021
e01ea0c
added testFunc parameter to testAgainstSpark
octaviansima Jan 28, 2021
e213a89
Merge remote-tracking branch 'upstream/master' into tpch
octaviansima Jan 29, 2021
e5004a2
ignore #18
octaviansima Jan 29, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 62 additions & 59 deletions src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@

package edu.berkeley.cs.rise.opaque.benchmark

import java.io.File
import scala.io.Source

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.util.fileToString
import org.apache.spark.sql.catalyst.util.resourceToString

object TPCH {
def part(
sqlContext: SQLContext, securityLevel: SecurityLevel, size: String, numPartitions: Int)
: DataFrame =
: Unit =
securityLevel.applyTo(
sqlContext.read.schema(
StructType(Seq(
Expand All @@ -42,10 +46,11 @@ object TPCH {
.option("delimiter", "|")
.load(s"${Benchmark.dataDir}/tpch/$size/part.tbl")
.repartition(numPartitions))
.createOrReplaceTempView("part")

def supplier(
sqlContext: SQLContext, securityLevel: SecurityLevel, size: String, numPartitions: Int)
: DataFrame =
: Unit =
securityLevel.applyTo(
sqlContext.read.schema(
StructType(Seq(
Expand All @@ -60,10 +65,11 @@ object TPCH {
.option("delimiter", "|")
.load(s"${Benchmark.dataDir}/tpch/$size/supplier.tbl")
.repartition(numPartitions))
.createOrReplaceTempView("supplier")

def lineitem(
sqlContext: SQLContext, securityLevel: SecurityLevel, size: String, numPartitions: Int)
: DataFrame =
: Unit =
securityLevel.applyTo(
sqlContext.read.schema(
StructType(Seq(
Expand All @@ -86,11 +92,12 @@ object TPCH {
.format("csv")
.option("delimiter", "|")
.load(s"${Benchmark.dataDir}/tpch/$size/lineitem.tbl")
.repartition(numPartitions))
.repartition(numPartitions))
.createOrReplaceTempView("lineitem")

def partsupp(
sqlContext: SQLContext, securityLevel: SecurityLevel, size: String, numPartitions: Int)
: DataFrame =
: Unit =
securityLevel.applyTo(
sqlContext.read.schema(
StructType(Seq(
Expand All @@ -103,10 +110,11 @@ object TPCH {
.option("delimiter", "|")
.load(s"${Benchmark.dataDir}/tpch/$size/partsupp.tbl")
.repartition(numPartitions))
.createOrReplaceTempView("partsupp")

def orders(
sqlContext: SQLContext, securityLevel: SecurityLevel, size: String, numPartitions: Int)
: DataFrame =
: Unit =
securityLevel.applyTo(
sqlContext.read.schema(
StructType(Seq(
Expand All @@ -123,10 +131,11 @@ object TPCH {
.option("delimiter", "|")
.load(s"${Benchmark.dataDir}/tpch/$size/orders.tbl")
.repartition(numPartitions))
.createOrReplaceTempView("orders")

def nation(
sqlContext: SQLContext, securityLevel: SecurityLevel, size: String, numPartitions: Int)
: DataFrame =
: Unit =
securityLevel.applyTo(
sqlContext.read.schema(
StructType(Seq(
Expand All @@ -138,59 +147,53 @@ object TPCH {
.option("delimiter", "|")
.load(s"${Benchmark.dataDir}/tpch/$size/nation.tbl")
.repartition(numPartitions))
.createOrReplaceTempView("nation")

def loadTables(
queryNumber: Int,
sqlContext: SQLContext,
securityLevel: SecurityLevel,
size: String,
numPartitions: Int) : Unit = {

private def tpch9EncryptedDFs(
sqlContext: SQLContext, securityLevel: SecurityLevel, size: String, numPartitions: Int)
: (DataFrame, DataFrame, DataFrame, DataFrame, DataFrame, DataFrame) = {
val partDF = part(sqlContext, securityLevel, size, numPartitions)
val supplierDF = supplier(sqlContext, securityLevel, size, numPartitions)
val lineitemDF = lineitem(sqlContext, securityLevel, size, numPartitions)
val partsuppDF = partsupp(sqlContext, securityLevel, size, numPartitions)
val ordersDF = orders(sqlContext, securityLevel, size, numPartitions)
val nationDF = nation(sqlContext, securityLevel, size, numPartitions)
(partDF, supplierDF, lineitemDF, partsuppDF, ordersDF, nationDF)
queryNumber match {
case 9 => {
part(sqlContext, securityLevel, size, numPartitions)
supplier(sqlContext, securityLevel, size, numPartitions)
lineitem(sqlContext, securityLevel, size, numPartitions)
partsupp(sqlContext, securityLevel, size, numPartitions)
orders(sqlContext, securityLevel, size, numPartitions)
nation(sqlContext, securityLevel, size, numPartitions)
}
}
}

def clearTables(sqlContext: SQLContext) : Unit = {
val tableNames = Seq("part", "supplier", "lineitem", "partsupp", "orders", "nation")

for (tableName <- tableNames) {
sqlContext.sql(s"""DROP TABLE IF EXISTS default.${tableName}""".stripMargin)
}
}

/** TPC-H query 9 - Product Type Profit Measure Query */
def tpch9(
sqlContext: SQLContext,
securityLevel: SecurityLevel,
size: String,
numPartitions: Int,
quantityThreshold: Option[Int] = None) : DataFrame = {
import sqlContext.implicits._
val (partDF, supplierDF, lineitemDF, partsuppDF, ordersDF, nationDF) =
tpch9EncryptedDFs(sqlContext, securityLevel, size, numPartitions)

val df =
ordersDF.select($"o_orderkey", year($"o_orderdate").as("o_year")) // 6. orders
.join(
(nationDF// 4. nation
.join(
supplierDF // 3. supplier
.join(
partDF // 1. part
.filter($"p_name".contains("maroon"))
.select($"p_partkey")
.join(partsuppDF, $"p_partkey" === $"ps_partkey"), // 2. partsupp
$"ps_suppkey" === $"s_suppkey"),
$"s_nationkey" === $"n_nationkey"))
.join(
// 5. lineitem
quantityThreshold match {
case Some(q) => lineitemDF.filter($"l_quantity" > lit(q))
case None => lineitemDF
},
$"s_suppkey" === $"l_suppkey" && $"p_partkey" === $"l_partkey"),
$"l_orderkey" === $"o_orderkey")
.select(
$"n_name",
$"o_year",
($"l_extendedprice" * (lit(1) - $"l_discount") - $"ps_supplycost" * $"l_quantity")
.as("amount"))
.groupBy("n_name", "o_year").agg(sum($"amount").as("sum_profit"))

df
}
def performQuery(queryNumber: Int, sqlContext: SQLContext) : DataFrame = {
val queryLocation = sys.env.getOrElse("OPAQUE_HOME", ".") + "/src/test/resources/tpch/"
val sqlStr = Source.fromFile(queryLocation + s"q$queryNumber.sql").getLines().mkString("\n")

sqlContext.sparkSession.sql(sqlStr)
}

def tpch(
queryNumber: Int,
sqlContext: SQLContext,
securityLevel: SecurityLevel,
size: String,
numPartitions: Int) : DataFrame = {

loadTables(queryNumber, sqlContext, securityLevel, size, numPartitions)
val df = performQuery(queryNumber, sqlContext)
clearTables(sqlContext)

df
}
}
23 changes: 23 additions & 0 deletions src/test/resources/tpch/q1.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
-- using default substitutions

select
l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
sum(l_extendedprice) as sum_base_price,
sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
avg(l_quantity) as avg_qty,
avg(l_extendedprice) as avg_price,
avg(l_discount) as avg_disc,
count(*) as count_order
from
lineitem
where
l_shipdate <= date '1998-12-01' - interval '90' day
group by
l_returnflag,
l_linestatus
order by
l_returnflag,
l_linestatus
34 changes: 34 additions & 0 deletions src/test/resources/tpch/q10.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
-- using default substitutions

select
c_custkey,
c_name,
sum(l_extendedprice * (1 - l_discount)) as revenue,
c_acctbal,
n_name,
c_address,
c_phone,
c_comment
from
customer,
orders,
lineitem,
nation
where
c_custkey = o_custkey
and l_orderkey = o_orderkey
and o_orderdate >= date '1993-10-01'
and o_orderdate < date '1993-10-01' + interval '3' month
and l_returnflag = 'R'
and c_nationkey = n_nationkey
group by
c_custkey,
c_name,
c_acctbal,
c_phone,
n_name,
c_address,
c_comment
order by
revenue desc
limit 20
29 changes: 29 additions & 0 deletions src/test/resources/tpch/q11.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
-- using default substitutions

select
ps_partkey,
sum(ps_supplycost * ps_availqty) as value
from
partsupp,
supplier,
nation
where
ps_suppkey = s_suppkey
and s_nationkey = n_nationkey
and n_name = 'GERMANY'
group by
ps_partkey having
sum(ps_supplycost * ps_availqty) > (
select
sum(ps_supplycost * ps_availqty) * 0.0001000000
from
partsupp,
supplier,
nation
where
ps_suppkey = s_suppkey
and s_nationkey = n_nationkey
and n_name = 'GERMANY'
)
order by
value desc
30 changes: 30 additions & 0 deletions src/test/resources/tpch/q12.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
-- using default substitutions

select
l_shipmode,
sum(case
when o_orderpriority = '1-URGENT'
or o_orderpriority = '2-HIGH'
then 1
else 0
end) as high_line_count,
sum(case
when o_orderpriority <> '1-URGENT'
and o_orderpriority <> '2-HIGH'
then 1
else 0
end) as low_line_count
from
orders,
lineitem
where
o_orderkey = l_orderkey
and l_shipmode in ('MAIL', 'SHIP')
and l_commitdate < l_receiptdate
and l_shipdate < l_commitdate
and l_receiptdate >= date '1994-01-01'
and l_receiptdate < date '1994-01-01' + interval '1' year
group by
l_shipmode
order by
l_shipmode
22 changes: 22 additions & 0 deletions src/test/resources/tpch/q13.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
-- using default substitutions

select
c_count,
count(*) as custdist
from
(
select
c_custkey,
count(o_orderkey) as c_count
from
customer left outer join orders on
c_custkey = o_custkey
and o_comment not like '%special%requests%'
group by
c_custkey
) as c_orders
group by
c_count
order by
custdist desc,
c_count desc
15 changes: 15 additions & 0 deletions src/test/resources/tpch/q14.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-- using default substitutions

select
100.00 * sum(case
when p_type like 'PROMO%'
then l_extendedprice * (1 - l_discount)
else 0
end) / sum(l_extendedprice * (1 - l_discount)) as promo_revenue
from
lineitem,
part
where
l_partkey = p_partkey
and l_shipdate >= date '1995-09-01'
and l_shipdate < date '1995-09-01' + interval '1' month
35 changes: 35 additions & 0 deletions src/test/resources/tpch/q15.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
-- using default substitutions

with revenue0 as
(select
l_suppkey as supplier_no,
sum(l_extendedprice * (1 - l_discount)) as total_revenue
from
lineitem
where
l_shipdate >= date '1996-01-01'
and l_shipdate < date '1996-01-01' + interval '3' month
group by
l_suppkey)


select
s_suppkey,
s_name,
s_address,
s_phone,
total_revenue
from
supplier,
revenue0
where
s_suppkey = supplier_no
and total_revenue = (
select
max(total_revenue)
from
revenue0
)
order by
s_suppkey

Loading