Skip to content

Commit 47b6577

Browse files
andrewlawhhAndrew Laweric-feng-2011Eric Fengchester-leung
authored
Merge distinct agg (#192)
* Support for multiple branched CaseWhen * Interval (#116) * add date_add, interval sql still running into issues * Add Interval SQL support * uncomment out the other tests * resolve comments * change interval equality Co-authored-by: Eric Feng <[email protected]> * Remove partition ID argument from enclaves * Fix comments * updates * Modifications to integrate crumb, log-mac, and all-outputs_mac, wip * Store log mac after each output buffer, add all-outputs-mac to each encryptedblocks wip * Add all_outputs_mac to all EncryptedBlocks once all log_macs have been generated * Almost builds * cpp builds * Use ubyte for all_outputs_mac * use Mac for all_outputs_mac * Hopefully this works for flatbuffers all_outputs_mac mutation, cpp builds * Scala builds now too, running into error with union * Stuff builds, error with all outputs mac serialization. this commit uses all_outputs_mac as Mac table * Fixed bug, basic encryption / show works * All single partition tests pass, multiple partiton passes until tpch-9 * All tests pass except tpch-9 and skew join * comment tpch back in * Check same number of ecalls per partition - exception for scanCollectLastPrimary(?) * First attempt at constructing executed DAG * Fix typos * Rework graph * Add log macs to graph nodes * Construct expected DAG and refactor JobNode. Refactor construction of executed DAG. * Implement 'paths to sink' for a DAG * add crumb for last ecall * Fix NULL handling for aggregation (#130) * Modify COUNT and SUM to correctly handle NULL values * Change average to support NULL values * Fix * Changing operator matching from logical to physical (#129) * WIP * Fix * Unapply change * Aggregation rewrite (#132) * updated build/sbt file (#135) * Travis update (#137) * update breeze (#138) * TPC-H test suite added (#136) * added tpch sql files * functions updated to save temp view * main function skeleton done * load and clear done * fix clear * performQuery done * import cleanup, use OPAQUE_HOME * TPC-H 9 refactored to use SQL rather than DF operations * removed : Unit, unused imports * added TestUtils.scala * moved all common initialization to TestUtils * update name * begin rewriting TPCH.scala to store persistent tables * invalid table name error * TPCH conversion to class started * compiles * added second case, cleared up names * added TPC-H 6 to check that persistent state has no issues * added functions for the last two tables * addressed most logic changes * DataFrame only loaded once * apply method in companion object * full test suite added * added testFunc parameter to testAgainstSpark * ignore #18 * Separate IN PR (#124) * finishing the in expression. adding more tests and null support. need confirmation on null behavior and also I wonder why integer field is sufficient for string * adding additional test * adding additional test * saving concat implementation and it's passing basic functionality tests * adding type aware comparison and better error message for IN operator * adding null checking for the concat operator and adding one additional test * cleaning up IN&Concat PR * deleting concat and preping the in branch for in pr * fixing null bahavior now it's only null when there's no match and there's null input * Build failed Co-authored-by: Ubuntu <[email protected]> Co-authored-by: Wenting Zheng <[email protected]> Co-authored-by: Wenting Zheng <[email protected]> * Merge new aggregate * Uncomment log_mac_lst clear * Clean up comments * Separate Concat PR (#125) Implementation of the CONCAT expression. Co-authored-by: Ubuntu <[email protected]> Co-authored-by: Wenting Zheng <[email protected]> * Clean up comments in other files * Update pathsEqual to be less conservative * Remove print statements from unit tests * Removed calls to toSet in TPC-H tests (#140) * removed calls to toSet * added calls to toSet back where queries are unordered * Documentation update (#148) * Cluster Remote Attestation Fix (#146) The existing code only had RA working when run locally. This PR adds a sleep for 5 seconds to make sure that all executors are spun up successfully before attestation begins. Closes #147 * upgrade to 3.0.1 (#144) * Update two TPC-H queries (#149) Tests for TPC-H 12 and 19 pass. * TPC-H 20 Fix (#142) * string to stringtype error * tpch 20 passes * cleanup * implemented changes * decimal.tofloat Co-authored-by: Wenting Zheng <[email protected]> * Add expected operator DAG generation from executedPlan string * Rebase * Join update (#145) * Merge join update * Integrate new join * Add expected operator for sortexec * Merge comp-integrity with join update * Remove some print statements * Migrate from Travis CI to Github Actions (#156) * Upgrade to OE 0.12 (#153) * Update README.md * Support for scalar subquery (#157) This PR implements the scalar subquery expression, which is triggered whenever a subquery returns a scalar value. There were two main problems that needed to be solved. First, support for matching the scalar subquery expression is necessary. Spark implements this by wrapping a SparkPlan within the expression and calls executeCollect. Then it constructs a literal with that value. However, this is problematic for us because that value should not be decrypted by the driver and serialized into an expression, since it's an intermediate value. Therefore, the second issue to be addressed here is supporting an encrypted literal. This is implemented in this PR by serializing an encrypted ciphertext into a base64 encoded string, and wrapping a Decrypt expression on top of it. This expression is then evaluated in the enclave and returns a literal. Note that, in order to test our implementation, we also implement a Decrypt expression in Scala. However, this should never be evaluated on the driver side and serialized into a plaintext literal. This is because Decrypt is designated as a Nondeterministic expression, and therefore will always evaluate on the workers. * Add TPC-H Benchmarks (#139) * logic decoupling in TPCH.scala for easier benchmarking * added TPCHBenchmark.scala * Benchmark.scala rewrite * done adding all support TPC-H query benchmarks * changed commandline arguments that benchmark takes * TPCHBenchmark takes in parameters * fixed issue with spark conf * size error handling, --help flag * add Utils.force, break cluster mode * comment out logistic regression benchmark * ensureCached right before temp view created/replaced * upgrade to 3.0.1 * upgrade to 3.0.1 * 10 scale factor * persistData * almost done refactor * more cleanup * compiles * 9 passes * cleanup * collect instead of force, sf_none * remove sf_none * defaultParallelism * no removing trailing/leading whitespace * add sf_med * hdfs works in local case * cleanup, added new CLI argument * added newly supported tpch queries * function for running all supported tests * Construct expected DAG from dataframe physical plan * Refactor collect and add integrity checking helper function to OpaqueOperatorTest * Float expressions (#160) This PR adds float normalization expressions [implemented in Spark](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFloatingNumbers.scala#L170). TPC-H query 2 also passes. * Broadcast Nested Loop Join - Left Anti and Left Semi (#159) This PR is the first of two parts towards making TPC-H 16 work: the other will be implementing `is_distinct` for aggregate operations. `BroadcastNestedLoopJoin` is Spark's "catch all" for non-equi joins. It works by first picking a side to broadcast, then iterating through every possible row combination and checking the non-equi condition against the pair. * Move join condition handling for equi-joins into enclave code (#164) * Add in TPC-H 21 * Add condition processing in enclave code * Code clean up * Enable query 18 * WIP * Local tests pass * Apply suggestions from code review Co-authored-by: octaviansima <[email protected]> * WIP * Address comments * q21.sql Co-authored-by: octaviansima <[email protected]> * Distinct aggregation support (#163) * matching in strategies.scala set up class thing cleanup added test cases for non-equi left anti join rename to serializeEquiJoinExpression added isEncrypted condition set up keys JoinExpr now has condition rename serialization does not throw compile error for BNLJ split up added condition in ExpressionEvaluation.h zipPartitions cpp put in place typo added func to header two loops in place update tests condition fixed scala loop interchange rows added tags ensure cached == match working comparison decoupling in ExpressionEvalulation save compiles and condition works is printing fix swap outer/inner o_i_match show() has the same result tests pass test cleanup added test cases for different condition BuildLeft works optional keys in scala started C++ passes the operator tests comments, cleanup attemping to do it the ~right~ way comments to distinguish between primary/secondary, operator tests pass cleanup comments, about to begin implementation for distinct agg ops is_distinct added test case serializing with isDistinct is_distinct in ExpressionEvaluation.h removed unused code from join implementation remove RowWriter/Reader in condition evaluation (join) easier test serialization done correct checking in Scala set is set up spaghetti but it finally works function for clearing values condition_eval isntead of condition goto comment remove explain from test, need to fix distinct aggregation for >1 partitions started impl of multiple partitions fix added rangepartitionexec that runs partitioning cleanup serialization properly comments, generalization for > 1 distinct function comments about to refactor into logical.Aggregation the new case has distinct in result expressions need to match on distinct removed new case (doesn't make difference?) works Upgrade to OE 0.12 (#153) Update README.md Support for scalar subquery (#157) This PR implements the scalar subquery expression, which is triggered whenever a subquery returns a scalar value. There were two main problems that needed to be solved. First, support for matching the scalar subquery expression is necessary. Spark implements this by wrapping a SparkPlan within the expression and calls executeCollect. Then it constructs a literal with that value. However, this is problematic for us because that value should not be decrypted by the driver and serialized into an expression, since it's an intermediate value. Therefore, the second issue to be addressed here is supporting an encrypted literal. This is implemented in this PR by serializing an encrypted ciphertext into a base64 encoded string, and wrapping a Decrypt expression on top of it. This expression is then evaluated in the enclave and returns a literal. Note that, in order to test our implementation, we also implement a Decrypt expression in Scala. However, this should never be evaluated on the driver side and serialized into a plaintext literal. This is because Decrypt is designated as a Nondeterministic expression, and therefore will always evaluate on the workers. match remove RangePartitionExec inefficient implementation refined Add TPC-H Benchmarks (#139) * logic decoupling in TPCH.scala for easier benchmarking * added TPCHBenchmark.scala * Benchmark.scala rewrite * done adding all support TPC-H query benchmarks * changed commandline arguments that benchmark takes * TPCHBenchmark takes in parameters * fixed issue with spark conf * size error handling, --help flag * add Utils.force, break cluster mode * comment out logistic regression benchmark * ensureCached right before temp view created/replaced * upgrade to 3.0.1 * upgrade to 3.0.1 * 10 scale factor * persistData * almost done refactor * more cleanup * compiles * 9 passes * cleanup * collect instead of force, sf_none * remove sf_none * defaultParallelism * no removing trailing/leading whitespace * add sf_med * hdfs works in local case * cleanup, added new CLI argument * added newly supported tpch queries * function for running all supported tests complete instead of partial -> final removed traces of join cleanup * added test case for one distinct one non, reverted comment * removed C++ level implementation of is_distinct * PartialMerge in operators.scala * stage 1: grouping with distinct expressions * stage 2: WIP * saving, sorting by group expressions ++ name distinct expressions worked * stage 1 & 2 printing the expected results * removed extraneous call to sorted, #3 in place but not working * stage 3 has the final, correct result: refactoring the Aggregate code to not cast aggregate expressions to Partial, PartialMerge, etc will be needed * refactor done, C++ still printing the correct values * need to formalize None case in EncryptedAggregateExec.output, but stage 4 passes * distinct and indistinct passes (git add -u) * general cleanup, None case looks nicer * throw error with >1 distinct, add test case for global distinct * no need for global aggregation case * single partition passes all aggregate tests, multiple partition doesn't * works with global sort first * works with non-global sort first * cleanup * cleanup tests * removed iostream, other nit * added test case for 13 * None case in isPartial match done properly * added test cases for sumDistinct * case-specific namedDistinctExpressions working * distinct sum is done * removed comments * got rid of mode argument * tests include null values * partition followed by local sort instead of first global sort * Remove addExpectedOperator from JobVerificationEngine, add comments * Implement expected DAG construction by doing graph manipulation on dataframe field instead of string parsing * Fix merge errors in the test cases Co-authored-by: Andrew Law <[email protected]> Co-authored-by: Eric Feng <[email protected]> Co-authored-by: Eric Feng <[email protected]> Co-authored-by: Chester Leung <[email protected]> Co-authored-by: Wenting Zheng <[email protected]> Co-authored-by: octaviansima <[email protected]> Co-authored-by: Chenyu Shi <[email protected]> Co-authored-by: Ubuntu <[email protected]> Co-authored-by: Wenting Zheng <[email protected]>
1 parent 697644b commit 47b6577

File tree

5 files changed

+165
-46
lines changed

5 files changed

+165
-46
lines changed

src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1435,6 +1435,10 @@ object Utils extends Logging {
14351435
}
14361436
(Seq(countUpdateExpr), Seq(count))
14371437
}
1438+
case PartialMerge => {
1439+
val countUpdateExpr = Add(count, c.inputAggBufferAttributes(0))
1440+
(Seq(countUpdateExpr), Seq(count))
1441+
}
14381442
case Final => {
14391443
val countUpdateExpr = Add(count, c.inputAggBufferAttributes(0))
14401444
(Seq(countUpdateExpr), Seq(count))
@@ -1443,7 +1447,7 @@ object Utils extends Logging {
14431447
val countUpdateExpr = Add(count, Literal(1L))
14441448
(Seq(countUpdateExpr), Seq(count))
14451449
}
1446-
case _ =>
1450+
case _ =>
14471451
}
14481452

14491453
tuix.AggregateExpr.createAggregateExpr(
@@ -1614,6 +1618,11 @@ object Utils extends Logging {
16141618
val sumUpdateExpr = If(IsNull(partialSum), sum, partialSum)
16151619
(Seq(sumUpdateExpr), Seq(sum))
16161620
}
1621+
case PartialMerge => {
1622+
val partialSum = Add(If(IsNull(sum), Literal.default(sumDataType), sum), s.inputAggBufferAttributes(0))
1623+
val sumUpdateExpr = If(IsNull(partialSum), sum, partialSum)
1624+
(Seq(sumUpdateExpr), Seq(sum))
1625+
}
16171626
case Final => {
16181627
val partialSum = Add(If(IsNull(sum), Literal.default(sumDataType), sum), s.inputAggBufferAttributes(0))
16191628
val sumUpdateExpr = If(IsNull(partialSum), sum, partialSum)

src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.sql.SQLContext
2424
object TPCHBenchmark {
2525

2626
// Add query numbers here once they are supported
27-
val supportedQueries = Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 17, 18, 19, 20, 21, 22)
27+
val supportedQueries = Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 16, 17, 18, 19, 20, 21, 22)
2828

2929
def query(queryNumber: Int, tpch: TPCH, sqlContext: SQLContext, numPartitions: Int) = {
3030
val sqlStr = tpch.getQuery(queryNumber)

src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala

Lines changed: 16 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -243,44 +243,35 @@ case class EncryptedFilterExec(condition: Expression, child: SparkPlan)
243243

244244
case class EncryptedAggregateExec(
245245
groupingExpressions: Seq[NamedExpression],
246-
aggExpressions: Seq[AggregateExpression],
247-
mode: AggregateMode,
246+
aggregateExpressions: Seq[AggregateExpression],
248247
child: SparkPlan)
249248
extends UnaryExecNode with OpaqueOperatorExec {
250249

251250
override def producedAttributes: AttributeSet =
252-
AttributeSet(aggExpressions) -- AttributeSet(groupingExpressions)
253-
254-
override def output: Seq[Attribute] = mode match {
255-
case Partial => groupingExpressions.map(_.toAttribute) ++ aggExpressions.map(_.copy(mode = Partial)).flatMap(_.aggregateFunction.inputAggBufferAttributes)
256-
case Final => groupingExpressions.map(_.toAttribute) ++ aggExpressions.map(_.resultAttribute)
257-
case Complete => groupingExpressions.map(_.toAttribute) ++ aggExpressions.map(_.resultAttribute)
258-
}
251+
AttributeSet(aggregateExpressions) -- AttributeSet(groupingExpressions)
252+
253+
override def output: Seq[Attribute] = groupingExpressions.map(_.toAttribute) ++
254+
aggregateExpressions.flatMap(expr => {
255+
expr.mode match {
256+
case Partial | PartialMerge =>
257+
expr.aggregateFunction.inputAggBufferAttributes
258+
case _ =>
259+
Seq(expr.resultAttribute)
260+
}
261+
})
259262

260263
override def executeBlocked(): RDD[Block] = {
261264

262-
val (groupingExprs, aggExprs) = mode match {
263-
case Partial => {
264-
val partialAggExpressions = aggExpressions.map(_.copy(mode = Partial))
265-
(groupingExpressions, partialAggExpressions)
266-
}
267-
case Final => {
268-
val finalGroupingExpressions = groupingExpressions.map(_.toAttribute)
269-
val finalAggExpressions = aggExpressions.map(_.copy(mode = Final))
270-
(finalGroupingExpressions, finalAggExpressions)
271-
}
272-
case Complete => {
273-
(groupingExpressions, aggExpressions.map(_.copy(mode = Complete)))
274-
}
275-
}
265+
val aggExprSer = Utils.serializeAggOp(groupingExpressions, aggregateExpressions, child.output)
266+
val isPartial = aggregateExpressions.map(expr => expr.mode)
267+
.exists(mode => mode == Partial || mode == PartialMerge)
276268

277-
val aggExprSer = Utils.serializeAggOp(groupingExprs, aggExprs, child.output)
278269

279270
timeOperator(child.asInstanceOf[OpaqueOperatorExec].executeBlocked(), "EncryptedPartialAggregateExec") {
280271
childRDD =>
281272
childRDD.map { block =>
282273
val (enclave, eid) = Utils.initEnclave()
283-
Block(enclave.NonObliviousAggregate(eid, aggExprSer, block.bytes, (mode == Partial)))
274+
Block(enclave.NonObliviousAggregate(eid, aggExprSer, block.bytes, isPartial))
284275
}
285276
}
286277
}

src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala

Lines changed: 84 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -152,25 +152,90 @@ object OpaqueOperators extends Strategy {
152152
if (isEncrypted(child) && aggExpressions.forall(expr => expr.isInstanceOf[AggregateExpression])) =>
153153

154154
val aggregateExpressions = aggExpressions.map(expr => expr.asInstanceOf[AggregateExpression])
155-
156-
if (groupingExpressions.size == 0) {
157-
// Global aggregation
158-
val partialAggregate = EncryptedAggregateExec(groupingExpressions, aggregateExpressions, Partial, planLater(child))
159-
val partialOutput = partialAggregate.output
160-
val (projSchema, tag) = tagForGlobalAggregate(partialOutput)
161-
162-
EncryptedProjectExec(resultExpressions,
163-
EncryptedAggregateExec(groupingExpressions, aggregateExpressions, Final,
164-
EncryptedProjectExec(partialOutput,
165-
EncryptedSortExec(Seq(SortOrder(tag, Ascending)), true,
166-
EncryptedProjectExec(projSchema, partialAggregate))))) :: Nil
167-
} else {
168-
// Grouping aggregation
169-
EncryptedProjectExec(resultExpressions,
170-
EncryptedAggregateExec(groupingExpressions, aggregateExpressions, Final,
171-
EncryptedSortExec(groupingExpressions.map(_.toAttribute).map(e => SortOrder(e, Ascending)), true,
172-
EncryptedAggregateExec(groupingExpressions, aggregateExpressions, Partial,
173-
EncryptedSortExec(groupingExpressions.map(e => SortOrder(e, Ascending)), false, planLater(child)))))) :: Nil
155+
val (functionsWithDistinct, functionsWithoutDistinct) = aggregateExpressions.partition(_.isDistinct)
156+
157+
functionsWithDistinct.size match {
158+
case 0 => // No distinct aggregate operations
159+
if (groupingExpressions.size == 0) {
160+
// Global aggregation
161+
val partialAggregate = EncryptedAggregateExec(groupingExpressions,
162+
aggregateExpressions.map(_.copy(mode = Partial)), planLater(child))
163+
val partialOutput = partialAggregate.output
164+
val (projSchema, tag) = tagForGlobalAggregate(partialOutput)
165+
166+
EncryptedProjectExec(resultExpressions,
167+
EncryptedAggregateExec(groupingExpressions, aggregateExpressions.map(_.copy(mode = Final)),
168+
EncryptedProjectExec(partialOutput,
169+
EncryptedSortExec(Seq(SortOrder(tag, Ascending)), true,
170+
EncryptedProjectExec(projSchema, partialAggregate))))) :: Nil
171+
} else {
172+
// Grouping aggregation
173+
EncryptedProjectExec(resultExpressions,
174+
EncryptedAggregateExec(groupingExpressions, aggregateExpressions.map(_.copy(mode = Final)),
175+
EncryptedSortExec(groupingExpressions.map(_.toAttribute).map(e => SortOrder(e, Ascending)), true,
176+
EncryptedAggregateExec(groupingExpressions, aggregateExpressions.map(_.copy(mode = Partial)),
177+
EncryptedSortExec(groupingExpressions.map(e => SortOrder(e, Ascending)), false, planLater(child)))))) :: Nil
178+
}
179+
case size if size == 1 => // One distinct aggregate operation
180+
// Because we are also grouping on the columns used in the distinct expressions,
181+
// we do not need separate cases for global and grouping aggregation.
182+
183+
// We need to extract named expressions from the children of the distinct aggregate functions
184+
// in order to group by those columns.
185+
val namedDistinctExpressions = functionsWithDistinct.head.aggregateFunction.children.flatMap{ e =>
186+
e match {
187+
case ne: NamedExpression =>
188+
Seq(ne)
189+
case _ =>
190+
e.children.filter(child => child.isInstanceOf[NamedExpression])
191+
.map(child => child.asInstanceOf[NamedExpression])
192+
}
193+
}
194+
val combinedGroupingExpressions = groupingExpressions ++ namedDistinctExpressions
195+
196+
// 1. Create an Aggregate operator for partial aggregations.
197+
val partialAggregate = {
198+
val sorted = EncryptedSortExec(combinedGroupingExpressions.map(e => SortOrder(e, Ascending)), false,
199+
planLater(child))
200+
EncryptedAggregateExec(combinedGroupingExpressions, functionsWithoutDistinct.map(_.copy(mode = Partial)), sorted)
201+
}
202+
203+
// 2. Create an Aggregate operator for partial merge aggregations.
204+
val partialMergeAggregate = {
205+
// Partition based on the final grouping expressions.
206+
val partitionOrder = groupingExpressions.map(e => SortOrder(e, Ascending))
207+
val partitioned = EncryptedRangePartitionExec(partitionOrder, partialAggregate)
208+
209+
// Local sort on the combined grouping expressions.
210+
val sortOrder = combinedGroupingExpressions.map(e => SortOrder(e, Ascending))
211+
val sorted = EncryptedSortExec(sortOrder, false, partitioned)
212+
213+
EncryptedAggregateExec(combinedGroupingExpressions,
214+
functionsWithoutDistinct.map(_.copy(mode = PartialMerge)), sorted)
215+
}
216+
217+
// 3. Create an Aggregate operator for partial aggregation of distinct aggregate expressions.
218+
val partialDistinctAggregate = {
219+
// Indistinct functions operate on aggregation buffers since partial aggregation was already called,
220+
// but distinct functions operate on the original input to the aggregation.
221+
EncryptedAggregateExec(groupingExpressions,
222+
functionsWithoutDistinct.map(_.copy(mode = PartialMerge)) ++
223+
functionsWithDistinct.map(_.copy(mode = Partial)), partialMergeAggregate)
224+
}
225+
226+
// 4. Create an Aggregate operator for the final aggregation.
227+
val finalAggregate = {
228+
val sorted = EncryptedSortExec(groupingExpressions.map(e => SortOrder(e, Ascending)),
229+
true, partialDistinctAggregate)
230+
EncryptedAggregateExec(groupingExpressions,
231+
(functionsWithoutDistinct ++ functionsWithDistinct).map(_.copy(mode = Final)), sorted)
232+
}
233+
234+
EncryptedProjectExec(resultExpressions, finalAggregate) :: Nil
235+
236+
case _ => { // More than one distinct operations
237+
throw new UnsupportedOperationException("Aggregate operations with more than one distinct expressions are not yet supported.")
238+
}
174239
}
175240

176241
case p @ Union(Seq(left, right)) if isEncrypted(p) =>

src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -489,6 +489,30 @@ trait OpaqueOperatorTests extends OpaqueTestsBase { self =>
489489
.sortBy { case Row(category: String, _) => category }
490490
}
491491

492+
testAgainstSpark("aggregate count distinct and indistinct") { securityLevel =>
493+
val data = (0 until 64).map{ i =>
494+
if (i % 6 == 0)
495+
(abc(i), null.asInstanceOf[Int], i % 8)
496+
else
497+
(abc(i), i % 4, i % 8)
498+
}.toSeq
499+
val words = makeDF(data, securityLevel, "category", "id", "price")
500+
words.groupBy("category").agg(countDistinct("id").as("num_unique_ids"),
501+
count("price").as("num_prices")).collect.toSet
502+
}
503+
504+
testAgainstSpark("aggregate count distinct") { securityLevel =>
505+
val data = (0 until 64).map{ i =>
506+
if (i % 6 == 0)
507+
(abc(i), null.asInstanceOf[Int])
508+
else
509+
(abc(i), i % 8)
510+
}.toSeq
511+
val words = makeDF(data, securityLevel, "category", "price")
512+
words.groupBy("category").agg(countDistinct("price").as("num_unique_prices"))
513+
.collect.sortBy { case Row(category: String, _) => category }
514+
}
515+
492516
testAgainstSpark("aggregate first") { securityLevel =>
493517
val data = for (i <- 0 until 256) yield (i, abc(i), 1)
494518
val words = makeDF(data, securityLevel, "id", "category", "price")
@@ -536,6 +560,30 @@ trait OpaqueOperatorTests extends OpaqueTestsBase { self =>
536560
.sortBy { case Row(word: String, _) => word }
537561
}
538562

563+
testAgainstSpark("aggregate sum distinct and indistinct") { securityLevel =>
564+
val data = (0 until 64).map{ i =>
565+
if (i % 6 == 0)
566+
(abc(i), null.asInstanceOf[Int], i % 8)
567+
else
568+
(abc(i), i % 4, i % 8)
569+
}.toSeq
570+
val words = makeDF(data, securityLevel, "category", "id", "price")
571+
words.groupBy("category").agg(sumDistinct("id").as("sum_unique_ids"),
572+
sum("price").as("sum_prices")).collect.toSet
573+
}
574+
575+
testAgainstSpark("aggregate sum distinct") { securityLevel =>
576+
val data = (0 until 64).map{ i =>
577+
if (i % 6 == 0)
578+
(abc(i), null.asInstanceOf[Int])
579+
else
580+
(abc(i), i % 8)
581+
}.toSeq
582+
val words = makeDF(data, securityLevel, "category", "price")
583+
words.groupBy("category").agg(sumDistinct("price").as("sum_unique_prices"))
584+
.collect.sortBy { case Row(category: String, _) => category }
585+
}
586+
539587
testAgainstSpark("aggregate on multiple columns") { securityLevel =>
540588
val data = for (i <- 0 until 256) yield (abc(i), 1, 1.0f)
541589
val words = makeDF(data, securityLevel, "str", "x", "y")
@@ -567,6 +615,12 @@ trait OpaqueOperatorTests extends OpaqueTestsBase { self =>
567615
integrityCollect(words.agg(sum("count").as("totalCount")))
568616
}
569617

618+
testAgainstSpark("global aggregate count distinct") { securityLevel =>
619+
val data = for (i <- 0 until 256) yield (i, abc(i), i % 64)
620+
val words = makeDF(data, securityLevel, "id", "word", "price")
621+
words.agg(countDistinct("price").as("num_unique_prices")).collect
622+
}
623+
570624
testAgainstSpark("global aggregate with 0 rows") { securityLevel =>
571625
val data = for (i <- 0 until 256) yield (i, abc(i), 1)
572626
val words = makeDF(data, securityLevel, "id", "word", "count")

0 commit comments

Comments
 (0)