Skip to content

Commit 0f877d4

Browse files
authored
Cluster Remote Attestation Fix (#146)
The existing code only had RA working when run locally. This PR adds a sleep for 5 seconds to make sure that all executors are spun up successfully before attestation begins. Closes #147
1 parent 0d69b7b commit 0f877d4

File tree

2 files changed

+19
-6
lines changed

2 files changed

+19
-6
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ After downloading the Opaque codebase, build and test it as follows.
6262

6363
Next, run Apache Spark SQL queries with Opaque as follows, assuming [Spark 3.0](https://www.apache.org/dyn/closer.lua/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz) (`wget http://apache.mirrors.pair.com/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz`) is already installed:
6464

65+
\* Opaque needs Spark's `'spark.executor.instances'` property to be set. This can be done in a custom config file, the default config file found at `/opt/spark/conf/spark-defaults.conf`, or as a `spark-submit` or `spark-shell` argument: `--conf 'spark.executor.instances=<value>`.
66+
6567
1. Package Opaque into a JAR:
6668
6769
```sh

src/main/scala/edu/berkeley/cs/rise/opaque/RA.scala

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,29 +22,40 @@ import org.apache.spark.internal.Logging
2222

2323
import edu.berkeley.cs.rise.opaque.execution.SP
2424

25-
// Helper to handle remote attestation
26-
//
25+
// Performs remote attestation for all executors
26+
// that have not been attested yet
2727

2828
object RA extends Logging {
2929
def initRA(sc: SparkContext): Unit = {
3030

31-
val rdd = sc.makeRDD(Seq.fill(sc.defaultParallelism) { () })
31+
// All executors need to be initialized before attestation can occur
32+
var numExecutors = 1
33+
if (!sc.isLocal) {
34+
numExecutors = sc.getConf.getInt("spark.executor.instances", -1)
35+
while (!sc.isLocal && sc.getExecutorMemoryStatus.size < numExecutors) {}
36+
}
37+
38+
val rdd = sc.parallelize(Seq.fill(numExecutors) {()}, numExecutors)
3239
val intelCert = Utils.findResource("AttestationReportSigningCACert.pem")
3340
val sp = new SP()
3441

3542
sp.Init(Utils.sharedKey, intelCert)
3643

37-
val msg1s = rdd.mapPartitionsWithIndex { (i, _) =>
44+
// Runs on executors
45+
val msg1s = rdd.mapPartitions { (_) =>
3846
val (enclave, eid) = Utils.initEnclave()
3947
val msg1 = enclave.GenerateReport(eid)
4048
Iterator((eid, msg1))
4149
}.collect.toMap
4250

51+
// Runs on driver
4352
val msg2s = msg1s.map{case (eid, msg1) => (eid, sp.ProcessEnclaveReport(msg1))}
4453

45-
val attestationResults = rdd.mapPartitionsWithIndex { (_, _) =>
54+
// Runs on executors
55+
val attestationResults = rdd.mapPartitions { (_) =>
4656
val (enclave, eid) = Utils.initEnclave()
47-
enclave.FinishAttestation(eid, msg2s(eid))
57+
val msg2 = msg2s(eid)
58+
enclave.FinishAttestation(eid, msg2)
4859
Iterator((eid, true))
4960
}.collect.toMap
5061

0 commit comments

Comments
 (0)