Skip to content

Support for distinct aggregations #161

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from

Conversation

octaviansima
Copy link
Collaborator

This is part 2 of making TPC-H 16 work.

Currently, our aggregation code does not support aggregate expressions that have isDistinct set to true. This PR intends to add support for that by doing two things:

  1. Serializing an is_distinct field for aggregate expressions and implementing a set-based tracker in C++ that updates rows according to the aggregate expression if is_distinct is false or if the given value was not already seen.
  2. In Scala, checking if an aggregate operation has any aggregate expressions with isDistinct = true. If this is the case, then we need to perform a global sort before the partial aggregation. This is to make sure that the same aggregate values are not distributed across different partitions and are not counted more than once.

@octaviansima octaviansima requested a review from wzheng February 19, 2021 22:39
Copy link
Collaborator

@wzheng wzheng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partial review

Comment on lines -33 to +34
// Skip outputting the final row if the number of input rows is 0 AND
// 1. It's a grouping aggregation, OR
// Skip outputting the final row if:
// 1. The number of input rows is 0 AND it's a grouping aggregation, OR
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this comment changed? I don't think the new meaning is equivalent to what the code says?

val (functionsWithDistinct, functionsWithoutDistinct) = aggregateExpressions.partition(_.isDistinct)

functionsWithDistinct.size match {
case size if size == 0 => // No distinct aggregate operations
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this case 0 since you're matching on functionsWithDistinct.size?

EncryptedProjectExec(resultExpressions,
EncryptedAggregateExec(groupingExpressions, aggregateExpressions, Complete,
EncryptedSortExec(groupingExpressions.map(e => SortOrder(e, Ascending)), true, planLater(child)))) :: Nil
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you see what happens when there are multiple distincts? We should catch it here or somewhere else and say that we do not support it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Getting org.apache.spark.sql.execution.WholeStageCodegenExec cannot be cast to edu.berkeley.cs.rise.opaque.execution.OpaqueOperatorExec.

words.groupBy("category").agg(countDistinct("price").as("distinctPrices"))
.collect.sortBy { case Row(category: String, _) => category }
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add another test for global distinct aggregation, as well as tests for when the number of distinct items is 0?

set up class thing

cleanup

added test cases for non-equi left anti join

rename to serializeEquiJoinExpression

added isEncrypted condition

set up keys

JoinExpr now has condition

rename

serialization does not throw compile error for BNLJ

split up

added condition in ExpressionEvaluation.h

zipPartitions

cpp put in place

typo

added func to header

two loops in place

update tests

condition

fixed scala loop

interchange rows

added tags

ensure cached

== match working

comparison decoupling in ExpressionEvalulation

save

compiles and condition works

is printing

fix swap outer/inner

o_i_match

show() has the same result

tests pass

test cleanup

added test cases for different condition

BuildLeft works

optional keys in scala

started C++

passes the operator tests

comments, cleanup

attemping to do it the ~right~ way

comments to distinguish between primary/secondary, operator tests pass

cleanup comments, about to begin implementation for distinct agg ops

is_distinct

added test case

serializing with isDistinct

is_distinct in ExpressionEvaluation.h

removed unused code from join implementation

remove RowWriter/Reader in condition evaluation (join)

easier test

serialization done

correct checking in Scala

set is set up

spaghetti but it finally works

function for clearing values

condition_eval isntead of condition

goto

comment

remove explain from test, need to fix distinct aggregation for >1 partitions

started impl of multiple partitions fix

added rangepartitionexec that runs

partitioning cleanup

serialization properly

comments, generalization for > 1 distinct function

comments

about to refactor into logical.Aggregation

the new case has distinct in result expressions

need to match on distinct

removed new case (doesn't make difference?)

works

Upgrade to OE 0.12 (mc2-project#153)

Update README.md

Support for scalar subquery (mc2-project#157)

This PR implements the scalar subquery expression, which is triggered whenever a subquery returns a scalar value. There were two main problems that needed to be solved.

First, support for matching the scalar subquery expression is necessary. Spark implements this by wrapping a SparkPlan within the expression and calls executeCollect. Then it constructs a literal with that value. However, this is problematic for us because that value should not be decrypted by the driver and serialized into an expression, since it's an intermediate value.

Therefore, the second issue to be addressed here is supporting an encrypted literal. This is implemented in this PR by serializing an encrypted ciphertext into a base64 encoded string, and wrapping a Decrypt expression on top of it. This expression is then evaluated in the enclave and returns a literal. Note that, in order to test our implementation, we also implement a Decrypt expression in Scala. However, this should never be evaluated on the driver side and serialized into a plaintext literal. This is because Decrypt is designated as a Nondeterministic expression, and therefore will always evaluate on the workers.

match

remove RangePartitionExec

inefficient implementation refined

Add TPC-H Benchmarks (mc2-project#139)

* logic decoupling in TPCH.scala for easier benchmarking

* added TPCHBenchmark.scala

* Benchmark.scala rewrite

* done adding all support TPC-H query benchmarks

* changed commandline arguments that benchmark takes

* TPCHBenchmark takes in parameters

* fixed issue with spark conf

* size error handling, --help flag

* add Utils.force, break cluster mode

* comment out logistic regression benchmark

* ensureCached right before temp view created/replaced

* upgrade to 3.0.1

* upgrade to 3.0.1

* 10 scale factor

* persistData

* almost done refactor

* more cleanup

* compiles

* 9 passes

* cleanup

* collect instead of force, sf_none

* remove sf_none

* defaultParallelism

* no removing trailing/leading whitespace

* add sf_med

* hdfs works in local case

* cleanup, added new CLI argument

* added newly supported tpch queries

* function for running all supported tests

complete instead of partial -> final

removed traces of join

cleanup
@octaviansima
Copy link
Collaborator Author

See #163

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants