From d2568de95dc3d5424e36f2437544c85331cac973 Mon Sep 17 00:00:00 2001 From: Wenting Zheng Date: Mon, 8 Feb 2021 18:38:35 +0000 Subject: [PATCH 01/16] WIP --- .../cs/rise/opaque/OpaqueOperatorTests.scala | 15 +++++++++++++++ .../berkeley/cs/rise/opaque/OpaqueTestsBase.scala | 4 ++-- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala index c8926c3df7..02f5ccf564 100644 --- a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala +++ b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala @@ -852,6 +852,21 @@ trait OpaqueOperatorTests extends OpaqueTestsBase { self => KMeans.train(spark, securityLevel, numPartitions, 10, 2, 3, 0.01).map(_.toSeq).sorted } + testAgainstSpark("Scalar subquery") { securityLevel => + // Example taken from https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/2728434780191932/1483312212640900/6987336228780374/latest.html + val data = for (i <- 0 until 256) yield (i, abc(i), 1) + val words = makeDF(data, securityLevel, "id", "word", "count") + words.createTempView("words") + + try { + val df = spark.sql("""SELECT id, word, (SELECT MAX(count) FROM words) max_age FROM words ORDER BY id, word""") + df.explain + df.collect + } finally { + spark.catalog.dropTempView("words") + } + } + testAgainstSpark("pagerank") { securityLevel => PageRank.run(spark, securityLevel, "256", numPartitions).collect.toSet } diff --git a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueTestsBase.scala b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueTestsBase.scala index 8117fb8de1..54ded162bc 100644 --- a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueTestsBase.scala +++ b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueTestsBase.scala @@ -68,7 +68,7 @@ trait OpaqueTestsBase extends FunSuite with BeforeAndAfterAll { self => testFunc(name + " - encrypted") { // The === operator uses implicitly[Equality[A]], which compares Double and Array[Double] // using the numeric tolerance specified above - assert(f(Encrypted) === f(Insecure)) + assert(f(Insecure) === f(Encrypted)) } } @@ -102,4 +102,4 @@ trait OpaqueTestsBase extends FunSuite with BeforeAndAfterAll { self => } } } -} \ No newline at end of file +} From 2573a90ac76181d12d143236365fe487b63ee562 Mon Sep 17 00:00:00 2001 From: Wenting Zheng Date: Tue, 9 Feb 2021 01:54:48 +0000 Subject: [PATCH 02/16] Functions for encrypting/decrypting scalar values --- .../edu/berkeley/cs/rise/opaque/Utils.scala | 37 +++++++++++++++++-- .../cs/rise/opaque/execution/operators.scala | 3 +- .../cs/rise/opaque/OpaqueOperatorTests.scala | 11 +++++- 3 files changed, 45 insertions(+), 6 deletions(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala index 5a85154253..8902e3fc90 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala @@ -21,7 +21,9 @@ import java.io.File import java.io.FileNotFoundException import java.nio.ByteBuffer import java.nio.ByteOrder +import java.nio.charset.StandardCharsets; import java.security.SecureRandom +import java.util.Base64 import java.util.UUID import javax.crypto._ @@ -93,6 +95,7 @@ import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.catalyst.util.ArrayBasedMapData import org.apache.spark.sql.catalyst.util.ArrayData import org.apache.spark.sql.catalyst.util.MapData +import org.apache.spark.sql.execution.ScalarSubquery import org.apache.spark.sql.execution.aggregate.ScalaUDAF import org.apache.spark.sql.types._ import org.apache.spark.storage.StorageLevel @@ -350,8 +353,6 @@ object Utils extends Logging { rdd.foreach(x => {}) } - - def flatbuffersCreateField( builder: FlatBufferBuilder, value: Any, dataType: DataType, isNull: Boolean): Int = { (value, dataType) match { @@ -578,6 +579,7 @@ object Utils extends Logging { tuix.StringField.createValueVector(builder, Array.empty), 0), isNull) + case _ => throw new OpaqueException(s"FlatbuffersCreateField failed to match on ${value} of type {value.getClass.getName()}, ${dataType}") } } @@ -652,6 +654,34 @@ object Utils extends Logging { val MaxBlockSize = 1000 + /** + * Encrypts/decrypts a given scalar value + **/ + def encryptScalar(value: Any, dataType: DataType): String = { + // First serialize the scalar value + var builder = new FlatBufferBuilder + var rowOffsets = ArrayBuilder.make[Int] + + val v = dataType match { + case StringType => UTF8String.fromString(value.asInstanceOf[String]) + case _ => value + } + + builder.finish(flatbuffersCreateField(builder, v, dataType, false)) + val plaintext = builder.sizedByteArray() + val ciphertext = encrypt(plaintext) + val ciphertext_str = Base64.getEncoder().encodeToString(ciphertext); + ciphertext_str + } + + def decryptScalar(ciphertext: String): Any = { + val ciphertext_bytes = Base64.getDecoder().decode(ciphertext); + val plaintext = decrypt(ciphertext_bytes) + val field = tuix.Field.getRootAsField(ByteBuffer.wrap(plaintext)) + val value = flatbuffersExtractFieldValue(field) + value + } + /** * Encrypts the given Spark SQL [[InternalRow]]s into a [[Block]] (a serialized * tuix.EncryptedBlocks). @@ -792,7 +822,7 @@ object Utils extends Logging { tuix.ExprUnion.Col, tuix.Col.createCol(builder, colNum)) - case (Literal(value, dataType), Nil) => + case (Literal(value, dataType), Nil) => val valueOffset = flatbuffersCreateField(builder, value, dataType, (value == null)) tuix.Expr.createExpr( builder, @@ -1087,6 +1117,7 @@ object Utils extends Logging { tuix.ExprUnion.ClosestPoint, tuix.ClosestPoint.createClosestPoint( builder, leftOffset, rightOffset)) + } } } diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala index e40acbff78..4e68490339 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala @@ -131,7 +131,8 @@ trait OpaqueOperatorExec extends SparkPlan { * method and persist the resulting RDD. [[ConvertToOpaqueOperators]] later eliminates the dummy * relation from the logical plan, but this only happens after InMemoryRelation has called this * method. We therefore have to silently return an empty RDD here. - */ + */ + override def doExecute(): RDD[InternalRow] = { sqlContext.sparkContext.emptyRDD // throw new UnsupportedOperationException("use executeBlocked") diff --git a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala index 02f5ccf564..b05d870e7b 100644 --- a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala +++ b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala @@ -852,7 +852,14 @@ trait OpaqueOperatorTests extends OpaqueTestsBase { self => KMeans.train(spark, securityLevel, numPartitions, 10, 2, 3, 0.01).map(_.toSeq).sorted } - testAgainstSpark("Scalar subquery") { securityLevel => + testOpaqueOnly("encrypted literal") { securityLevel => + val str = "hello world" + val enc_str = Utils.encryptScalar(str, StringType) + val dec_str = Utils.decryptScalar(enc_str) + println(s"dec_str = ${dec_str}") + } + + testAgainstSpark("scalar subquery", ignore) { securityLevel => // Example taken from https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/2728434780191932/1483312212640900/6987336228780374/latest.html val data = for (i <- 0 until 256) yield (i, abc(i), 1) val words = makeDF(data, securityLevel, "id", "word", "count") @@ -860,7 +867,7 @@ trait OpaqueOperatorTests extends OpaqueTestsBase { self => try { val df = spark.sql("""SELECT id, word, (SELECT MAX(count) FROM words) max_age FROM words ORDER BY id, word""") - df.explain + df.explain(true) df.collect } finally { spark.catalog.dropTempView("words") From cb1320b2e83b97cf11fe605beaed6bf7eb0e8f9a Mon Sep 17 00:00:00 2001 From: Wenting Zheng Date: Tue, 9 Feb 2021 03:07:58 +0000 Subject: [PATCH 03/16] WIP --- src/enclave/Enclave/ExpressionEvaluation.h | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/enclave/Enclave/ExpressionEvaluation.h b/src/enclave/Enclave/ExpressionEvaluation.h index 80475b877f..a817073aba 100644 --- a/src/enclave/Enclave/ExpressionEvaluation.h +++ b/src/enclave/Enclave/ExpressionEvaluation.h @@ -288,6 +288,19 @@ class FlatbuffersExpressionEvaluator { static_cast(expr->expr())->value(), builder); } + case tuix::ExprUnion_Decrypt: + { + auto add = static_cast(expr->expr()); + auto left_offset = eval_helper(row, add->left()); + auto right_offset = eval_helper(row, add->right()); + + return eval_binary_arithmetic_op( + builder, + flatbuffers::GetTemporaryPointer(builder, left_offset), + flatbuffers::GetTemporaryPointer(builder, right_offset)); + + } + case tuix::ExprUnion_Cast: { auto cast = static_cast(expr->expr()); From 3ceb05e565a9542d9d8bd160e878878ddc2c88a2 Mon Sep 17 00:00:00 2001 From: Wenting Zheng Date: Tue, 9 Feb 2021 04:50:39 +0000 Subject: [PATCH 04/16] Adding the Decrypt expression --- src/flatbuffers/Expr.fbs | 7 +++- .../edu/berkeley/cs/rise/opaque/Utils.scala | 8 ++++- .../cs/rise/opaque/expressions/Decrypt.scala | 33 +++++++++++++++++++ 3 files changed, 46 insertions(+), 2 deletions(-) create mode 100644 src/main/scala/edu/berkeley/cs/rise/opaque/expressions/Decrypt.scala diff --git a/src/flatbuffers/Expr.fbs b/src/flatbuffers/Expr.fbs index a96215b5a2..8c89cd0757 100644 --- a/src/flatbuffers/Expr.fbs +++ b/src/flatbuffers/Expr.fbs @@ -40,7 +40,8 @@ union ExprUnion { CreateArray, Upper, DateAdd, - DateAddInterval + DateAddInterval, + Decrypt } table Expr { @@ -221,4 +222,8 @@ table ClosestPoint { table Upper { child:Expr; +} + +table Decrypt { + child:Expr; } \ No newline at end of file diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala index 8902e3fc90..9d808a4f2c 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala @@ -822,13 +822,19 @@ object Utils extends Logging { tuix.ExprUnion.Col, tuix.Col.createCol(builder, colNum)) - case (Literal(value, dataType), Nil) => + case (Literal(value, dataType), Nil) => val valueOffset = flatbuffersCreateField(builder, value, dataType, (value == null)) tuix.Expr.createExpr( builder, tuix.ExprUnion.Literal, tuix.Literal.createLiteral(builder, valueOffset)) + case (Decrypt(Literal(value, StringType), dataType), Seq(childOffset)) => + tuix.Expr.createExpr( + builder, + tuix.ExprUnion.Decrypt, + tuix.Decrypt.createDecrypt(builder, childOffset)) + case (Alias(child, _), Seq(childOffset)) => // TODO: Use an expression for aliases so we can refer to them elsewhere in the expression // tree. For now we just ignore them when evaluating expressions. diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/expressions/Decrypt.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/expressions/Decrypt.scala new file mode 100644 index 0000000000..b6a0820847 --- /dev/null +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/expressions/Decrypt.scala @@ -0,0 +1,33 @@ +package edu.berkeley.cs.rise.opaque.expressions + +import org.apache.spark.sql.Column +import org.apache.spark.sql.catalyst.expressions.BinaryOperator +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.NullIntolerant +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.catalyst.util.ArrayData +import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.types.DataTypes +import org.apache.spark.sql.types.DoubleType + +object Decrypt { + def decrypt(v: Column): Column = new Column(Decrypt(v.expr)) +} + +case class Decrypt(child: Expression, dataType: DataType) + extends UnaryOperator with NullIntolerant with CodegenFallback { + + override def dataType: DataType = dataType + + override def inputType = StringType + + override def symbol: String = "decrypt" + + override def sqlOperator: String = "decrypt" + + protected override def nullSafeEval(input: StringType): Any = { + // TODO: Implement this function so that we can test against Spark + val v = input.asInstanceOf[StringType] + Utils.decrypt(v) + } +} From 8e0b01178e173b23abf6355f101ed1e4d890283a Mon Sep 17 00:00:00 2001 From: Wenting Zheng Date: Fri, 12 Feb 2021 05:48:42 +0000 Subject: [PATCH 05/16] WIP --- src/enclave/Enclave/ExpressionEvaluation.h | 37 +++++++-- src/enclave/Enclave/util.cpp | 76 +++++++++++++++++++ src/enclave/Enclave/util.h | 2 + src/flatbuffers/Expr.fbs | 2 +- .../edu/berkeley/cs/rise/opaque/Utils.scala | 3 +- .../cs/rise/opaque/expressions/Decrypt.scala | 31 ++++---- .../cs/rise/opaque/OpaqueOperatorTests.scala | 9 ++- 7 files changed, 131 insertions(+), 29 deletions(-) diff --git a/src/enclave/Enclave/ExpressionEvaluation.h b/src/enclave/Enclave/ExpressionEvaluation.h index a817073aba..6e1768bb30 100644 --- a/src/enclave/Enclave/ExpressionEvaluation.h +++ b/src/enclave/Enclave/ExpressionEvaluation.h @@ -290,15 +290,38 @@ class FlatbuffersExpressionEvaluator { case tuix::ExprUnion_Decrypt: { - auto add = static_cast(expr->expr()); - auto left_offset = eval_helper(row, add->left()); - auto right_offset = eval_helper(row, add->right()); + auto decrypt = static_cast(expr->expr()); + const tuix::Field *value = + flatbuffers::GetTemporaryPointer(builder, eval_helper(row, decrypt->value())); - return eval_binary_arithmetic_op( - builder, - flatbuffers::GetTemporaryPointer(builder, left_offset), - flatbuffers::GetTemporaryPointer(builder, right_offset)); + if (value->value_type() != tuix::FieldUnion_StringField) { + throw std::runtime_error( + std::string("tuix::Decrypt only accepts a string input, not ") + + std::string(tuix::EnumNameFieldUnion(value->value_type()))); + } + bool result_is_null = value->is_null(); + if (!result_is_null) { + auto str_field = static_cast(value->value()); + + std::vector str_vec( + flatbuffers::VectorIterator(str_field->value()->Data(), + static_cast(0)), + flatbuffers::VectorIterator(str_field->value()->Data(), + static_cast(str_field->length()))); + + std::string ciphertext(str_vec.begin(), str_vec.end()); + auto plaintext = ciphertext_base64_decode(ciphertext); + + const tuix::Field *field = flatbuffers::GetRoot(plaintext.data()); + printf("Decrypted field is "); + print(field); + + return flatbuffers_copy(field, builder); + } else { + throw std::runtime_error(std::string("tuix::Decrypt does not accept a NULL string\n")); + } + } case tuix::ExprUnion_Cast: diff --git a/src/enclave/Enclave/util.cpp b/src/enclave/Enclave/util.cpp index 0f13e6af49..7735231d63 100644 --- a/src/enclave/Enclave/util.cpp +++ b/src/enclave/Enclave/util.cpp @@ -142,3 +142,79 @@ int secs_to_tm(long long t, struct tm *tm) { return 0; } + +// Code adapted from https://stackoverflow.com/questions/180947/base64-decode-snippet-in-c +/* + Copyright (C) 2004-2008 Rene Nyffenegger + + This source code is provided 'as-is', without any express or implied + warranty. In no event will the author be held liable for any damages + arising from the use of this software. + + Permission is granted to anyone to use this software for any purpose, + including commercial applications, and to alter it and redistribute it + freely, subject to the following restrictions: + + 1. The origin of this source code must not be misrepresented; you must not + claim that you wrote the original source code. If you use this source code + in a product, an acknowledgment in the product documentation would be + appreciated but is not required. + + 2. Altered source versions must be plainly marked as such, and must not be + misrepresented as being the original source code. + + 3. This notice may not be removed or altered from any source distribution. + + Rene Nyffenegger rene.nyffenegger@adp-gmbh.ch + +*/ + +static const std::string base64_chars = + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz" + "0123456789+/"; + +static inline bool is_base64(unsigned char c) { + return (isalnum(c) || (c == '+') || (c == '/')); +} + +std::string ciphertext_base64_decode(std::string const& encoded_string) { + int in_len = encoded_string.size(); + int i = 0; + int j = 0; + int in_ = 0; + uint8_t char_array_4[4], char_array_3[3]; + std::string ret; + + while (in_len-- && ( encoded_string[in_] != '=') && is_base64(encoded_string[in_])) { + char_array_4[i++] = encoded_string[in_]; in_++; + if (i ==4) { + for (i = 0; i <4; i++) + char_array_4[i] = base64_chars.find(char_array_4[i]); + + char_array_3[0] = (char_array_4[0] << 2) + ((char_array_4[1] & 0x30) >> 4); + char_array_3[1] = ((char_array_4[1] & 0xf) << 4) + ((char_array_4[2] & 0x3c) >> 2); + char_array_3[2] = ((char_array_4[2] & 0x3) << 6) + char_array_4[3]; + + for (i = 0; (i < 3); i++) + ret += char_array_3[i]; + i = 0; + } + } + + if (i) { + for (j = i; j <4; j++) + char_array_4[j] = 0; + + for (j = 0; j <4; j++) + char_array_4[j] = base64_chars.find(char_array_4[j]); + + char_array_3[0] = (char_array_4[0] << 2) + ((char_array_4[1] & 0x30) >> 4); + char_array_3[1] = ((char_array_4[1] & 0xf) << 4) + ((char_array_4[2] & 0x3c) >> 2); + char_array_3[2] = ((char_array_4[2] & 0x3) << 6) + char_array_4[3]; + + for (j = 0; (j < i - 1); j++) ret += char_array_3[j]; + } + + return ret; +} diff --git a/src/enclave/Enclave/util.h b/src/enclave/Enclave/util.h index b4e0b52327..7d9664f295 100644 --- a/src/enclave/Enclave/util.h +++ b/src/enclave/Enclave/util.h @@ -41,4 +41,6 @@ int pow_2(int value); int secs_to_tm(long long t, struct tm *tm); +std::string ciphertext_base64_decode(std::string const &encoded_string); + #endif // UTIL_H diff --git a/src/flatbuffers/Expr.fbs b/src/flatbuffers/Expr.fbs index 8c89cd0757..c61c797386 100644 --- a/src/flatbuffers/Expr.fbs +++ b/src/flatbuffers/Expr.fbs @@ -225,5 +225,5 @@ table Upper { } table Decrypt { - child:Expr; + value:Expr; } \ No newline at end of file diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala index 9d808a4f2c..60149865ee 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala @@ -106,6 +106,7 @@ import edu.berkeley.cs.rise.opaque.execution.Block import edu.berkeley.cs.rise.opaque.execution.OpaqueOperatorExec import edu.berkeley.cs.rise.opaque.execution.SGXEnclave import edu.berkeley.cs.rise.opaque.expressions.ClosestPoint +import edu.berkeley.cs.rise.opaque.expressions.Decrypt import edu.berkeley.cs.rise.opaque.expressions.DotProduct import edu.berkeley.cs.rise.opaque.expressions.VectorAdd import edu.berkeley.cs.rise.opaque.expressions.VectorMultiply @@ -829,7 +830,7 @@ object Utils extends Logging { tuix.ExprUnion.Literal, tuix.Literal.createLiteral(builder, valueOffset)) - case (Decrypt(Literal(value, StringType), dataType), Seq(childOffset)) => + case (Decrypt(child, dataType), Seq(childOffset)) => tuix.Expr.createExpr( builder, tuix.ExprUnion.Decrypt, diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/expressions/Decrypt.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/expressions/Decrypt.scala index b6a0820847..68d441b17d 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/expressions/Decrypt.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/expressions/Decrypt.scala @@ -1,33 +1,30 @@ package edu.berkeley.cs.rise.opaque.expressions +import edu.berkeley.cs.rise.opaque.Utils + import org.apache.spark.sql.Column -import org.apache.spark.sql.catalyst.expressions.BinaryOperator +import org.apache.spark.sql.catalyst.expressions.UnaryExpression import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.expressions.NullIntolerant import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback -import org.apache.spark.sql.catalyst.util.ArrayData import org.apache.spark.sql.types.DataType import org.apache.spark.sql.types.DataTypes -import org.apache.spark.sql.types.DoubleType +import org.apache.spark.sql.types.StringType +import org.apache.spark.unsafe.types.UTF8String object Decrypt { - def decrypt(v: Column): Column = new Column(Decrypt(v.expr)) + def decrypt(v: Column, dataType: DataType): Column = new Column(Decrypt(v.expr, dataType)) } -case class Decrypt(child: Expression, dataType: DataType) - extends UnaryOperator with NullIntolerant with CodegenFallback { - - override def dataType: DataType = dataType - - override def inputType = StringType - - override def symbol: String = "decrypt" +// TODO: write expression description +case class Decrypt(child: Expression, outputDataType: DataType) + extends UnaryExpression with NullIntolerant with CodegenFallback { - override def sqlOperator: String = "decrypt" + override def dataType: DataType = outputDataType - protected override def nullSafeEval(input: StringType): Any = { - // TODO: Implement this function so that we can test against Spark - val v = input.asInstanceOf[StringType] - Utils.decrypt(v) + protected override def nullSafeEval(input: Any): Any = { + // This function is implemented so that we can test against Spark + val v = input.asInstanceOf[UTF8String].toString + Utils.decryptScalar(v) } } diff --git a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala index b05d870e7b..bfc5df84be 100644 --- a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala +++ b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala @@ -853,10 +853,13 @@ trait OpaqueOperatorTests extends OpaqueTestsBase { self => } testOpaqueOnly("encrypted literal") { securityLevel => - val str = "hello world" - val enc_str = Utils.encryptScalar(str, StringType) + val input = 123 + val enc_str = Utils.encryptScalar(input, IntegerType) val dec_str = Utils.decryptScalar(enc_str) - println(s"dec_str = ${dec_str}") + + val data = for (i <- 0 until 256) yield (i, abc(i), 1) + val words = makeDF(data, securityLevel, "id", "word", "count") + } testAgainstSpark("scalar subquery", ignore) { securityLevel => From 4e630ed01db708225e645dd3cd7b6d330f726a7c Mon Sep 17 00:00:00 2001 From: Wenting Zheng Date: Fri, 12 Feb 2021 20:12:47 +0000 Subject: [PATCH 06/16] Modified Decrypt expression to be nondeterministic --- .../edu/berkeley/cs/rise/opaque/Utils.scala | 1 + .../cs/rise/opaque/expressions/Decrypt.scala | 16 +++++++++++++--- .../cs/rise/opaque/OpaqueOperatorTests.scala | 9 ++++++--- 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala index 3c2b708be5..2a5ceaa0da 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala @@ -856,6 +856,7 @@ object Utils extends Logging { tuix.Literal.createLiteral(builder, valueOffset)) case (Decrypt(child, dataType), Seq(childOffset)) => + println("Decrypt serialization") tuix.Expr.createExpr( builder, tuix.ExprUnion.Decrypt, diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/expressions/Decrypt.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/expressions/Decrypt.scala index 68d441b17d..8c2d9254ca 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/expressions/Decrypt.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/expressions/Decrypt.scala @@ -3,9 +3,11 @@ package edu.berkeley.cs.rise.opaque.expressions import edu.berkeley.cs.rise.opaque.Utils import org.apache.spark.sql.Column -import org.apache.spark.sql.catalyst.expressions.UnaryExpression +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.expressions.NullIntolerant +import org.apache.spark.sql.catalyst.expressions.Nondeterministic +import org.apache.spark.sql.catalyst.expressions.UnaryExpression import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.types.DataType import org.apache.spark.sql.types.DataTypes @@ -18,12 +20,20 @@ object Decrypt { // TODO: write expression description case class Decrypt(child: Expression, outputDataType: DataType) - extends UnaryExpression with NullIntolerant with CodegenFallback { + extends UnaryExpression with NullIntolerant with CodegenFallback with Nondeterministic { override def dataType: DataType = outputDataType + protected def initializeInternal(partitionIndex: Int): Unit = { } + + protected override def evalInternal(input: InternalRow): Any = { + val v = child.eval() + nullSafeEval(v) + } + protected override def nullSafeEval(input: Any): Any = { - // This function is implemented so that we can test against Spark + // This function is implemented so that we can test against Spark; + // should never be used in production because we want to keep the literal encrypted val v = input.asInstanceOf[UTF8String].toString Utils.decryptScalar(v) } diff --git a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala index 7d177f5f5a..ffa2d94f8b 100644 --- a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala +++ b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala @@ -35,6 +35,7 @@ import org.apache.spark.unsafe.types.CalendarInterval import edu.berkeley.cs.rise.opaque.benchmark._ import edu.berkeley.cs.rise.opaque.execution.EncryptedBlockRDDScanExec +import edu.berkeley.cs.rise.opaque.expressions.Decrypt.decrypt import edu.berkeley.cs.rise.opaque.expressions.DotProduct.dot import edu.berkeley.cs.rise.opaque.expressions.VectorMultiply.vectormultiply import edu.berkeley.cs.rise.opaque.expressions.VectorSum @@ -880,13 +881,15 @@ trait OpaqueOperatorTests extends OpaqueTestsBase { self => } testOpaqueOnly("encrypted literal") { securityLevel => - val input = 123 + val input = 10 val enc_str = Utils.encryptScalar(input, IntegerType) - val dec_str = Utils.decryptScalar(enc_str) val data = for (i <- 0 until 256) yield (i, abc(i), 1) val words = makeDF(data, securityLevel, "id", "word", "count") - + val df = words.filter($"id" < decrypt(lit(enc_str), IntegerType)).sort($"id") + df.explain + df.show() + df } testAgainstSpark("scalar subquery", ignore) { securityLevel => From f741a52e59aed3bba62ad16c1309d5b541be189d Mon Sep 17 00:00:00 2001 From: Wenting Zheng Date: Fri, 12 Feb 2021 21:05:34 +0000 Subject: [PATCH 07/16] Encrypted literal works --- src/enclave/Enclave/ExpressionEvaluation.h | 21 +++++++++++-------- .../cs/rise/opaque/OpaqueOperatorTests.scala | 6 ++---- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/src/enclave/Enclave/ExpressionEvaluation.h b/src/enclave/Enclave/ExpressionEvaluation.h index e9e183a090..4f65395278 100644 --- a/src/enclave/Enclave/ExpressionEvaluation.h +++ b/src/enclave/Enclave/ExpressionEvaluation.h @@ -290,9 +290,9 @@ class FlatbuffersExpressionEvaluator { case tuix::ExprUnion_Decrypt: { - auto decrypt = static_cast(expr->expr()); + auto decrypt_expr = static_cast(expr->expr()); const tuix::Field *value = - flatbuffers::GetTemporaryPointer(builder, eval_helper(row, decrypt->value())); + flatbuffers::GetTemporaryPointer(builder, eval_helper(row, decrypt_expr->value())); if (value->value_type() != tuix::FieldUnion_StringField) { throw std::runtime_error( @@ -311,13 +311,16 @@ class FlatbuffersExpressionEvaluator { static_cast(str_field->length()))); std::string ciphertext(str_vec.begin(), str_vec.end()); - auto plaintext = ciphertext_base64_decode(ciphertext); - - const tuix::Field *field = flatbuffers::GetRoot(plaintext.data()); - printf("Decrypted field is "); - print(field); - - return flatbuffers_copy(field, builder); + std::string ciphertext_decoded = ciphertext_base64_decode(ciphertext); + + uint8_t *plaintext = new uint8_t[ciphertext_decoded.size()]; + decrypt(reinterpret_cast(ciphertext_decoded.data()), ciphertext_decoded.size(), plaintext); + + const tuix::Field *field = flatbuffers::GetRoot(plaintext); + auto ret = flatbuffers_copy(field, builder); + + delete plaintext; + return ret; } else { throw std::runtime_error(std::string("tuix::Decrypt does not accept a NULL string\n")); } diff --git a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala index ffa2d94f8b..5b4dd8508e 100644 --- a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala +++ b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala @@ -880,16 +880,14 @@ trait OpaqueOperatorTests extends OpaqueTestsBase { self => KMeans.train(spark, securityLevel, numPartitions, 10, 2, 3, 0.01).map(_.toSeq).sorted } - testOpaqueOnly("encrypted literal") { securityLevel => + testAgainstSpark("encrypted literal") { securityLevel => val input = 10 val enc_str = Utils.encryptScalar(input, IntegerType) val data = for (i <- 0 until 256) yield (i, abc(i), 1) val words = makeDF(data, securityLevel, "id", "word", "count") val df = words.filter($"id" < decrypt(lit(enc_str), IntegerType)).sort($"id") - df.explain - df.show() - df + df.collect() } testAgainstSpark("scalar subquery", ignore) { securityLevel => From 0509786c6ead87eac1a31d55bed0aedfb8c9de69 Mon Sep 17 00:00:00 2001 From: Wenting Zheng Date: Sat, 13 Feb 2021 02:02:20 +0000 Subject: [PATCH 08/16] WIP --- .../edu/berkeley/cs/rise/opaque/Utils.scala | 25 +++++++++++++++++++ .../cs/rise/opaque/execution/operators.scala | 6 ++++- .../cs/rise/opaque/OpaqueOperatorTests.scala | 2 +- 3 files changed, 31 insertions(+), 2 deletions(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala index 2a5ceaa0da..9a4e7ed082 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala @@ -94,6 +94,7 @@ import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.catalyst.util.ArrayBasedMapData import org.apache.spark.sql.catalyst.util.ArrayData import org.apache.spark.sql.catalyst.util.MapData +import org.apache.spark.sql.execution.SubqueryExec import org.apache.spark.sql.execution.ScalarSubquery import org.apache.spark.sql.execution.aggregate.ScalaUDAF import org.apache.spark.sql.types._ @@ -1152,6 +1153,30 @@ object Utils extends Logging { // TODO: Implement decimal serialization, followed by CheckOverflow childOffset + case (ScalarSubquery(SubqueryExec(name, child), exprId), Seq()) => + val output = child.output(0) + val dataType = output match { + case AttributeReference(name, dataType, _, _) => dataType + case _ => throw new OpaqueException("Scalar subquery cannot match to AttributeReference") + } + // Need to deserialize the encrypted blocks to get the encrypted block + val buf = ByteBuffer.wrap(child.asInstanceOf[OpaqueOperatorExec].collectEncrypted()(0).bytes) + val encryptedBlocks = tuix.EncryptedBlocks.getRootAsEncryptedBlocks(buf) + assert(encryptedBlocks.blocksLength == 1) + val encryptedBlock = encryptedBlocks.blocks(0) + val ciphertextBuf = encryptedBlock.encRowsAsByteBuffer + val ciphertext = new Array[Byte](ciphertextBuf.remaining) + ciphertextBuf.get(ciphertext) + val ciphertext_str = Base64.getEncoder().encodeToString(ciphertext) + val value = decryptScalar(ciphertext_str) + println(s"value = ${value}") + // flatbuffersSerializeExpression( + // builder, + // Decrypt(Literal(UTF8String.fromString(ciphertext_str), StringType), dataType), + // input + // ) + 0 + case (_, Seq(childOffset)) => throw new OpaqueException("Expression not supported: " + expr.toString()) } diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala index 4375aa16de..4088ef6bc1 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala @@ -141,8 +141,12 @@ trait OpaqueOperatorExec extends SparkPlan { // throw new UnsupportedOperationException("use executeBlocked") } + def collectEncrypted(): Array[Block] = { + executeBlocked().collect() + } + override def executeCollect(): Array[InternalRow] = { - executeBlocked().collect().flatMap { block => + collectEncrypted().flatMap { block => Utils.decryptBlockFlatbuffers(block) } } diff --git a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala index 5b4dd8508e..0107d56d08 100644 --- a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala +++ b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala @@ -890,7 +890,7 @@ trait OpaqueOperatorTests extends OpaqueTestsBase { self => df.collect() } - testAgainstSpark("scalar subquery", ignore) { securityLevel => + testOpaqueOnly("scalar subquery") { securityLevel => // Example taken from https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/2728434780191932/1483312212640900/6987336228780374/latest.html val data = for (i <- 0 until 256) yield (i, abc(i), 1) val words = makeDF(data, securityLevel, "id", "word", "count") From e682a89534095b106f591cd20578718709e49211 Mon Sep 17 00:00:00 2001 From: Wenting Zheng Date: Sat, 13 Feb 2021 03:01:22 +0000 Subject: [PATCH 09/16] Scalar subquery unit test passes --- src/enclave/Enclave/ExpressionEvaluation.h | 8 +++-- .../edu/berkeley/cs/rise/opaque/Utils.scala | 31 +++++++++++++------ .../cs/rise/opaque/OpaqueOperatorTests.scala | 5 ++- 3 files changed, 29 insertions(+), 15 deletions(-) diff --git a/src/enclave/Enclave/ExpressionEvaluation.h b/src/enclave/Enclave/ExpressionEvaluation.h index 4f65395278..8a83d7dbe8 100644 --- a/src/enclave/Enclave/ExpressionEvaluation.h +++ b/src/enclave/Enclave/ExpressionEvaluation.h @@ -315,8 +315,12 @@ class FlatbuffersExpressionEvaluator { uint8_t *plaintext = new uint8_t[ciphertext_decoded.size()]; decrypt(reinterpret_cast(ciphertext_decoded.data()), ciphertext_decoded.size(), plaintext); - - const tuix::Field *field = flatbuffers::GetRoot(plaintext); + + BufferRefView buf(plaintext, ciphertext_decoded.size()); + buf.verify(); + + const tuix::Rows *rows = buf.root(); + const tuix::Field *field = rows->rows()->Get(0)->field_values()->Get(0); auto ret = flatbuffers_copy(field, builder); delete plaintext; diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala index 9a4e7ed082..015733afbe 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala @@ -682,7 +682,19 @@ object Utils extends Logging { case _ => value } - builder.finish(flatbuffersCreateField(builder, v, dataType, false)) + // TODO: the NULL variable for field value could be set to true + builder.finish( + tuix.Rows.createRows( + builder, + tuix.Rows.createRowsVector( + builder, + Array(tuix.Row.createRow( + builder, + tuix.Row.createFieldValuesVector( + builder, + Array(flatbuffersCreateField(builder, v, dataType, false))), + false))))) + val plaintext = builder.sizedByteArray() val ciphertext = encrypt(plaintext) val ciphertext_str = Base64.getEncoder().encodeToString(ciphertext); @@ -692,7 +704,9 @@ object Utils extends Logging { def decryptScalar(ciphertext: String): Any = { val ciphertext_bytes = Base64.getDecoder().decode(ciphertext); val plaintext = decrypt(ciphertext_bytes) - val field = tuix.Field.getRootAsField(ByteBuffer.wrap(plaintext)) + val rows = tuix.Rows.getRootAsRows(ByteBuffer.wrap(plaintext)) + val row = rows.rows(0) + val field = row.fieldValues(0) val value = flatbuffersExtractFieldValue(field) value } @@ -1168,14 +1182,11 @@ object Utils extends Logging { val ciphertext = new Array[Byte](ciphertextBuf.remaining) ciphertextBuf.get(ciphertext) val ciphertext_str = Base64.getEncoder().encodeToString(ciphertext) - val value = decryptScalar(ciphertext_str) - println(s"value = ${value}") - // flatbuffersSerializeExpression( - // builder, - // Decrypt(Literal(UTF8String.fromString(ciphertext_str), StringType), dataType), - // input - // ) - 0 + flatbuffersSerializeExpression( + builder, + Decrypt(Literal(UTF8String.fromString(ciphertext_str), StringType), dataType), + input + ) case (_, Seq(childOffset)) => throw new OpaqueException("Expression not supported: " + expr.toString()) diff --git a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala index 0107d56d08..a25e58dfe2 100644 --- a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala +++ b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala @@ -890,15 +890,14 @@ trait OpaqueOperatorTests extends OpaqueTestsBase { self => df.collect() } - testOpaqueOnly("scalar subquery") { securityLevel => + testAgainstSpark("scalar subquery") { securityLevel => // Example taken from https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/2728434780191932/1483312212640900/6987336228780374/latest.html - val data = for (i <- 0 until 256) yield (i, abc(i), 1) + val data = for (i <- 0 until 256) yield (i, abc(i), i) val words = makeDF(data, securityLevel, "id", "word", "count") words.createTempView("words") try { val df = spark.sql("""SELECT id, word, (SELECT MAX(count) FROM words) max_age FROM words ORDER BY id, word""") - df.explain(true) df.collect } finally { spark.catalog.dropTempView("words") From 759b2c72e4ffff3b0b7ca154114e9beba7552abe Mon Sep 17 00:00:00 2001 From: Wenting Zheng Date: Sat, 13 Feb 2021 03:04:54 +0000 Subject: [PATCH 10/16] More TPC-H queries supported --- src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala | 1 - src/test/scala/edu/berkeley/cs/rise/opaque/TPCHTests.scala | 6 +++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala index 015733afbe..54a703a668 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala @@ -871,7 +871,6 @@ object Utils extends Logging { tuix.Literal.createLiteral(builder, valueOffset)) case (Decrypt(child, dataType), Seq(childOffset)) => - println("Decrypt serialization") tuix.Expr.createExpr( builder, tuix.ExprUnion.Decrypt, diff --git a/src/test/scala/edu/berkeley/cs/rise/opaque/TPCHTests.scala b/src/test/scala/edu/berkeley/cs/rise/opaque/TPCHTests.scala index ed8da375c5..6a0e567adc 100644 --- a/src/test/scala/edu/berkeley/cs/rise/opaque/TPCHTests.scala +++ b/src/test/scala/edu/berkeley/cs/rise/opaque/TPCHTests.scala @@ -68,7 +68,7 @@ trait TPCHTests extends OpaqueTestsBase { self => tpch.query(10, securityLevel, spark.sqlContext, numPartitions).collect } - testAgainstSpark("TPC-H 11", ignore) { securityLevel => + testAgainstSpark("TPC-H 11") { securityLevel => tpch.query(11, securityLevel, spark.sqlContext, numPartitions).collect } @@ -84,7 +84,7 @@ trait TPCHTests extends OpaqueTestsBase { self => tpch.query(14, securityLevel, spark.sqlContext, numPartitions).collect.toSet } - testAgainstSpark("TPC-H 15", ignore) { securityLevel => + testAgainstSpark("TPC-H 15") { securityLevel => tpch.query(15, securityLevel, spark.sqlContext, numPartitions).collect } @@ -112,7 +112,7 @@ trait TPCHTests extends OpaqueTestsBase { self => tpch.query(21, securityLevel, spark.sqlContext, numPartitions).collect } - testAgainstSpark("TPC-H 22", ignore) { securityLevel => + testAgainstSpark("TPC-H 22") { securityLevel => tpch.query(22, securityLevel, spark.sqlContext, numPartitions).collect } } From f123077812425fb5f60b6a16773b69f31e6b74fa Mon Sep 17 00:00:00 2001 From: Wenting Zheng Date: Sat, 13 Feb 2021 03:07:17 +0000 Subject: [PATCH 11/16] Also turn on TPC-H 4 --- src/test/scala/edu/berkeley/cs/rise/opaque/TPCHTests.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/scala/edu/berkeley/cs/rise/opaque/TPCHTests.scala b/src/test/scala/edu/berkeley/cs/rise/opaque/TPCHTests.scala index 6a0e567adc..8d60dfa550 100644 --- a/src/test/scala/edu/berkeley/cs/rise/opaque/TPCHTests.scala +++ b/src/test/scala/edu/berkeley/cs/rise/opaque/TPCHTests.scala @@ -40,7 +40,7 @@ trait TPCHTests extends OpaqueTestsBase { self => tpch.query(3, securityLevel, spark.sqlContext, numPartitions).collect } - testAgainstSpark("TPC-H 4", ignore) { securityLevel => + testAgainstSpark("TPC-H 4") { securityLevel => tpch.query(4, securityLevel, spark.sqlContext, numPartitions).collect } From 6c7ff1b4521cb7a7a9fe25c642d08870a4d5da4c Mon Sep 17 00:00:00 2001 From: Wenting Zheng Date: Sun, 14 Feb 2021 02:27:33 +0000 Subject: [PATCH 12/16] Debug message --- src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala index 54a703a668..9e1feffd86 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala @@ -1175,6 +1175,7 @@ object Utils extends Logging { // Need to deserialize the encrypted blocks to get the encrypted block val buf = ByteBuffer.wrap(child.asInstanceOf[OpaqueOperatorExec].collectEncrypted()(0).bytes) val encryptedBlocks = tuix.EncryptedBlocks.getRootAsEncryptedBlocks(buf) + println(s"Has ${encryptedBlocks.blocksLength} blocks") assert(encryptedBlocks.blocksLength == 1) val encryptedBlock = encryptedBlocks.blocks(0) val ciphertextBuf = encryptedBlock.encRowsAsByteBuffer From 4b57ad0fc3cd8291f2e552b88618213514e0cd36 Mon Sep 17 00:00:00 2001 From: Wenting Zheng Date: Mon, 15 Feb 2021 04:04:54 +0000 Subject: [PATCH 13/16] Add checks for block length --- .../edu/berkeley/cs/rise/opaque/Utils.scala | 37 ++++++++++++------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala index 9e1feffd86..663412ae91 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala @@ -1173,20 +1173,29 @@ object Utils extends Logging { case _ => throw new OpaqueException("Scalar subquery cannot match to AttributeReference") } // Need to deserialize the encrypted blocks to get the encrypted block - val buf = ByteBuffer.wrap(child.asInstanceOf[OpaqueOperatorExec].collectEncrypted()(0).bytes) - val encryptedBlocks = tuix.EncryptedBlocks.getRootAsEncryptedBlocks(buf) - println(s"Has ${encryptedBlocks.blocksLength} blocks") - assert(encryptedBlocks.blocksLength == 1) - val encryptedBlock = encryptedBlocks.blocks(0) - val ciphertextBuf = encryptedBlock.encRowsAsByteBuffer - val ciphertext = new Array[Byte](ciphertextBuf.remaining) - ciphertextBuf.get(ciphertext) - val ciphertext_str = Base64.getEncoder().encodeToString(ciphertext) - flatbuffersSerializeExpression( - builder, - Decrypt(Literal(UTF8String.fromString(ciphertext_str), StringType), dataType), - input - ) + val blockList = child.asInstanceOf[OpaqueOperatorExec].collectEncrypted() + val encryptedBlocksList = blockList.map { block => + val buf = ByteBuffer.wrap(block.bytes) + tuix.EncryptedBlocks.getRootAsEncryptedBlocks(buf) + } + val encryptedBlocks = encryptedBlocksList.find(_.blocksLength > 0).getOrElse(encryptedBlocksList(0)) + println(s"encryptedBlocks = ${encryptedBlocks}") + if (encryptedBlocks.blocksLength == 0) { + // If empty, the returned result is null + flatbuffersSerializeExpression(builder, Literal(null, dataType), input) + } else { + assert(encryptedBlocks.blocksLength == 1) + val encryptedBlock = encryptedBlocks.blocks(0) + val ciphertextBuf = encryptedBlock.encRowsAsByteBuffer + val ciphertext = new Array[Byte](ciphertextBuf.remaining) + ciphertextBuf.get(ciphertext) + val ciphertext_str = Base64.getEncoder().encodeToString(ciphertext) + flatbuffersSerializeExpression( + builder, + Decrypt(Literal(UTF8String.fromString(ciphertext_str), StringType), dataType), + input + ) + } case (_, Seq(childOffset)) => throw new OpaqueException("Expression not supported: " + expr.toString()) From 0b14b3bed8d542a082c5d676c7b49746e1b44cb6 Mon Sep 17 00:00:00 2001 From: Wenting Zheng Date: Mon, 15 Feb 2021 04:30:52 +0000 Subject: [PATCH 14/16] Add expression description --- .../edu/berkeley/cs/rise/opaque/Utils.scala | 4 +++- .../cs/rise/opaque/expressions/Decrypt.scala | 16 ++++++++++++++-- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala index 663412ae91..698edbf9ff 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala @@ -682,6 +682,8 @@ object Utils extends Logging { case _ => value } + val isNull = (value == null) + // TODO: the NULL variable for field value could be set to true builder.finish( tuix.Rows.createRows( @@ -693,7 +695,7 @@ object Utils extends Logging { tuix.Row.createFieldValuesVector( builder, Array(flatbuffersCreateField(builder, v, dataType, false))), - false))))) + isNull))))) val plaintext = builder.sizedByteArray() val ciphertext = encrypt(plaintext) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/expressions/Decrypt.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/expressions/Decrypt.scala index 8c2d9254ca..813c3e6706 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/expressions/Decrypt.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/expressions/Decrypt.scala @@ -5,6 +5,7 @@ import edu.berkeley.cs.rise.opaque.Utils import org.apache.spark.sql.Column import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.ExpressionDescription import org.apache.spark.sql.catalyst.expressions.NullIntolerant import org.apache.spark.sql.catalyst.expressions.Nondeterministic import org.apache.spark.sql.catalyst.expressions.UnaryExpression @@ -18,7 +19,18 @@ object Decrypt { def decrypt(v: Column, dataType: DataType): Column = new Column(Decrypt(v.expr, dataType)) } -// TODO: write expression description +@ExpressionDescription( + usage = """ + _FUNC_(child, outputDataType) - Decrypt the input evaluated expression, which should always be a string + """, + arguments = """ + Arguments: + * child - an encrypted literal of string type + * outputDataType - the decrypted data type + """) +/** + * + */ case class Decrypt(child: Expression, outputDataType: DataType) extends UnaryExpression with NullIntolerant with CodegenFallback with Nondeterministic { @@ -27,7 +39,7 @@ case class Decrypt(child: Expression, outputDataType: DataType) protected def initializeInternal(partitionIndex: Int): Unit = { } protected override def evalInternal(input: InternalRow): Any = { - val v = child.eval() + val v = input.getUTF8String(0) nullSafeEval(v) } From 2302f65cfdaa55a4e431f2c0c561d388199bcab3 Mon Sep 17 00:00:00 2001 From: Wenting Zheng Date: Mon, 15 Feb 2021 04:48:52 +0000 Subject: [PATCH 15/16] Change decrypt impl --- .../scala/edu/berkeley/cs/rise/opaque/expressions/Decrypt.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/expressions/Decrypt.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/expressions/Decrypt.scala index 813c3e6706..cfe90aeffd 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/expressions/Decrypt.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/expressions/Decrypt.scala @@ -39,7 +39,7 @@ case class Decrypt(child: Expression, outputDataType: DataType) protected def initializeInternal(partitionIndex: Int): Unit = { } protected override def evalInternal(input: InternalRow): Any = { - val v = input.getUTF8String(0) + val v = child.eval() nullSafeEval(v) } From f8ce9813a96776bbcc789c51943f25d2e0734641 Mon Sep 17 00:00:00 2001 From: Wenting Zheng Date: Thu, 18 Feb 2021 18:57:26 +0000 Subject: [PATCH 16/16] Address comments --- src/enclave/Enclave/ExpressionEvaluation.h | 2 +- src/enclave/Enclave/util.cpp | 2 +- src/enclave/Enclave/util.h | 2 +- src/flatbuffers/Expr.fbs | 2 +- src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala | 2 +- .../edu/berkeley/cs/rise/opaque/execution/operators.scala | 2 +- .../edu/berkeley/cs/rise/opaque/expressions/ClosestPoint.scala | 3 --- .../edu/berkeley/cs/rise/opaque/expressions/Decrypt.scala | 3 --- .../edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala | 2 +- 9 files changed, 7 insertions(+), 13 deletions(-) diff --git a/src/enclave/Enclave/ExpressionEvaluation.h b/src/enclave/Enclave/ExpressionEvaluation.h index 8a83d7dbe8..0f48c56d48 100644 --- a/src/enclave/Enclave/ExpressionEvaluation.h +++ b/src/enclave/Enclave/ExpressionEvaluation.h @@ -313,7 +313,7 @@ class FlatbuffersExpressionEvaluator { std::string ciphertext(str_vec.begin(), str_vec.end()); std::string ciphertext_decoded = ciphertext_base64_decode(ciphertext); - uint8_t *plaintext = new uint8_t[ciphertext_decoded.size()]; + uint8_t *plaintext = new uint8_t[dec_size(ciphertext_decoded.size())]; decrypt(reinterpret_cast(ciphertext_decoded.data()), ciphertext_decoded.size(), plaintext); BufferRefView buf(plaintext, ciphertext_decoded.size()); diff --git a/src/enclave/Enclave/util.cpp b/src/enclave/Enclave/util.cpp index 7735231d63..6cd2a898b0 100644 --- a/src/enclave/Enclave/util.cpp +++ b/src/enclave/Enclave/util.cpp @@ -178,7 +178,7 @@ static inline bool is_base64(unsigned char c) { return (isalnum(c) || (c == '+') || (c == '/')); } -std::string ciphertext_base64_decode(std::string const& encoded_string) { +std::string ciphertext_base64_decode(const std::string &encoded_string) { int in_len = encoded_string.size(); int i = 0; int j = 0; diff --git a/src/enclave/Enclave/util.h b/src/enclave/Enclave/util.h index 7d9664f295..df80ba7cd0 100644 --- a/src/enclave/Enclave/util.h +++ b/src/enclave/Enclave/util.h @@ -41,6 +41,6 @@ int pow_2(int value); int secs_to_tm(long long t, struct tm *tm); -std::string ciphertext_base64_decode(std::string const &encoded_string); +std::string ciphertext_base64_decode(const std::string &encoded_string); #endif // UTIL_H diff --git a/src/flatbuffers/Expr.fbs b/src/flatbuffers/Expr.fbs index c61c797386..4acce5e53d 100644 --- a/src/flatbuffers/Expr.fbs +++ b/src/flatbuffers/Expr.fbs @@ -226,4 +226,4 @@ table Upper { table Decrypt { value:Expr; -} \ No newline at end of file +} diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala index 698edbf9ff..4c6970e489 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala @@ -872,6 +872,7 @@ object Utils extends Logging { tuix.ExprUnion.Literal, tuix.Literal.createLiteral(builder, valueOffset)) + // This expression should never be evaluated on the driver case (Decrypt(child, dataType), Seq(childOffset)) => tuix.Expr.createExpr( builder, @@ -1181,7 +1182,6 @@ object Utils extends Logging { tuix.EncryptedBlocks.getRootAsEncryptedBlocks(buf) } val encryptedBlocks = encryptedBlocksList.find(_.blocksLength > 0).getOrElse(encryptedBlocksList(0)) - println(s"encryptedBlocks = ${encryptedBlocks}") if (encryptedBlocks.blocksLength == 0) { // If empty, the returned result is null flatbuffersSerializeExpression(builder, Literal(null, dataType), input) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala index 4088ef6bc1..4eb941157e 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala @@ -142,7 +142,7 @@ trait OpaqueOperatorExec extends SparkPlan { } def collectEncrypted(): Array[Block] = { - executeBlocked().collect() + executeBlocked().collect } override def executeCollect(): Array[InternalRow] = { diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/expressions/ClosestPoint.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/expressions/ClosestPoint.scala index b4f1e27200..7eac3c990c 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/expressions/ClosestPoint.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/expressions/ClosestPoint.scala @@ -29,9 +29,6 @@ object ClosestPoint { * point - list of coordinates representing a point * centroids - list of lists of coordinates, each representing a point """) -/** - * - */ case class ClosestPoint(left: Expression, right: Expression) extends BinaryExpression with NullIntolerant with CodegenFallback with ExpectsInputTypes { diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/expressions/Decrypt.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/expressions/Decrypt.scala index cfe90aeffd..a52ecb113e 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/expressions/Decrypt.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/expressions/Decrypt.scala @@ -28,9 +28,6 @@ object Decrypt { * child - an encrypted literal of string type * outputDataType - the decrypted data type """) -/** - * - */ case class Decrypt(child: Expression, outputDataType: DataType) extends UnaryExpression with NullIntolerant with CodegenFallback with Nondeterministic { diff --git a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala index a25e58dfe2..a69894d13c 100644 --- a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala +++ b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala @@ -887,7 +887,7 @@ trait OpaqueOperatorTests extends OpaqueTestsBase { self => val data = for (i <- 0 until 256) yield (i, abc(i), 1) val words = makeDF(data, securityLevel, "id", "word", "count") val df = words.filter($"id" < decrypt(lit(enc_str), IntegerType)).sort($"id") - df.collect() + df.collect } testAgainstSpark("scalar subquery") { securityLevel =>