From 423846f3a23a8d92cb3dfc0ef928f7728019c6bc Mon Sep 17 00:00:00 2001 From: HyukjinKwon Date: Wed, 31 Mar 2021 21:59:10 +0900 Subject: [PATCH 1/8] Set up a workflow for developers to run benchmark in their fork --- .github/workflows/benchmark.yml | 68 +++++++++++++++++++ .../apache/spark/benchmark/Benchmarks.scala | 35 ++++++++-- .../spark/rdd/CoalescedRDDBenchmark.scala | 6 ++ ...nalAppendOnlyUnsafeRowArrayBenchmark.scala | 1 + .../benchmark/SqlBasedBenchmark.scala | 4 ++ 5 files changed, 108 insertions(+), 6 deletions(-) create mode 100644 .github/workflows/benchmark.yml diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml new file mode 100644 index 000000000000..f3045e7b2c76 --- /dev/null +++ b/.github/workflows/benchmark.yml @@ -0,0 +1,68 @@ +name: Run benchmarks + +on: + workflow_dispatch: + inputs: + class: + description: 'Benchmark class' + required: true + default: '*' + jdk: + description: 'JDK version: 8 or 11' + required: true + default: '8' + +jobs: + benchmark: + name: "Run benchmarks: ${{ github.event.inputs.class }} (JDK ${{ github.event.inputs.jdk }})" + # Ubuntu 20.04 is the latest LTS. The next LTS is 22.04. + runs-on: ubuntu-20.04 + steps: + - name: Checkout Spark repository + uses: actions/checkout@v2 + # In order to get diff files + with: + fetch-depth: 0 + - name: Cache Scala, SBT and Maven + uses: actions/cache@v2 + with: + path: | + build/apache-maven-* + build/scala-* + build/*.jar + ~/.sbt + key: build-${{ hashFiles('**/pom.xml', 'project/build.properties', 'build/mvn', 'build/sbt', 'build/sbt-launch-lib.bash', 'build/spark-build-info') }} + restore-keys: | + build- + - name: Cache Coursier local repository + uses: actions/cache@v2 + with: + path: ~/.cache/coursier + key: benchmark-coursier-${{ github.event.inputs.jdk }}-${{ hashFiles('**/pom.xml', '**/plugins.sbt') }} + restore-keys: | + benchmark-coursier-${{ github.event.inputs.jdk }} + - name: Install Java ${{ github.event.inputs.jdk }} + uses: actions/setup-java@v1 + with: + java-version: ${{ github.event.inputs.jdk }} + - name: Run benchmarks + run: | + ./build/sbt -Pyarn -Pmesos -Pkubernetes -Phive -Phive-thriftserver -Phadoop-cloud -Pkinesis-asl -Pspark-ganglia-lgpl test:package + # Make less noisy + cp conf/log4j.properties.template conf/log4j.properties + sed -i 's/log4j.rootCategory=INFO, console/log4j.rootCategory=WARN, console/g' conf/log4j.properties + # In benchmark, we use local as master so set driver memory only. Note that GitHub Actions has 7 GB memory limit. + SPARK_GENERATE_BENCHMARK_FILES=1 bin/spark-submit \ + --driver-memory 6g --class org.apache.spark.benchmark.Benchmarks \ + --jars "`find . -name '*-SNAPSHOT-tests.jar' -o -name '*avro*-SNAPSHOT.jar' | paste -sd ',' -`" \ + "`find . -name 'spark-core*-SNAPSHOT-tests.jar'`" \ + "${{ github.event.inputs.class }}" + # To keep the directory structure and file permissions, tar them + # See also https://github.com/actions/upload-artifact#maintaining-file-permissions-and-case-sensitive-files + tar -cvf benchmark-results-${{ github.event.inputs.jdk }}.tar `git diff --name-only` + - name: Upload benchmark results + uses: actions/upload-artifact@v2 + with: + name: benchmark-results-${{ github.event.inputs.jdk }} + path: benchmark-results-${{ github.event.inputs.jdk }}.tar + diff --git a/core/src/test/scala/org/apache/spark/benchmark/Benchmarks.scala b/core/src/test/scala/org/apache/spark/benchmark/Benchmarks.scala index 101f6f8df8a8..8fb9ca4a6a35 100644 --- a/core/src/test/scala/org/apache/spark/benchmark/Benchmarks.scala +++ b/core/src/test/scala/org/apache/spark/benchmark/Benchmarks.scala @@ -18,6 +18,7 @@ package org.apache.spark.benchmark import java.io.File import java.lang.reflect.Modifier +import java.nio.file.{FileSystems, Paths} import scala.collection.JavaConverters._ import scala.util.Try @@ -30,10 +31,12 @@ import com.google.common.reflect.ClassPath * * {{{ * 1. with spark-submit - * bin/spark-submit --class --jars + * bin/spark-submit --class --jars + * * 2. generate result: * SPARK_GENERATE_BENCHMARK_FILES=1 bin/spark-submit --class --jars - * + * + * * Results will be written to all corresponding files under "benchmarks/". * Notice that it detects the sub-project's directories from jar's paths so the provided jars * should be properly placed under target (Maven build) or target/scala-* (SBT) when you @@ -41,21 +44,33 @@ import com.google.common.reflect.ClassPath * }}} * * In Mac, you can use a command as below to find all the test jars. + * Make sure to do not select duplicated jars created by different versions of builds or tools. * {{{ - * find . -name "*3.2.0-SNAPSHOT-tests.jar" | paste -sd ',' - + * find . -name '*-SNAPSHOT-tests.jar' | paste -sd ',' - * }}} * - * Full command example: + * The example below runs all benchmarks and generates the results: * {{{ * SPARK_GENERATE_BENCHMARK_FILES=1 bin/spark-submit --class \ * org.apache.spark.benchmark.Benchmarks --jars \ - * "`find . -name "*3.2.0-SNAPSHOT-tests.jar" | paste -sd ',' -`" \ - * ./core/target/scala-2.12/spark-core_2.12-3.2.0-SNAPSHOT-tests.jar + * "`find . -name '*-SNAPSHOT-tests.jar' -o -name '*avro*-SNAPSHOT.jar' | paste -sd ',' -`" \ + * "`find . -name 'spark-core*-SNAPSHOT-tests.jar'`" \ + * "*" + * }}} + * + * The example below runs all benchmarks under "org.apache.spark.sql.execution.datasources" + * {{{ + * bin/spark-submit --class \ + * org.apache.spark.benchmark.Benchmarks --jars \ + * "`find . -name '*-SNAPSHOT-tests.jar' -o -name '*avro*-SNAPSHOT.jar' | paste -sd ',' -`" \ + * "`find . -name 'spark-core*-SNAPSHOT-tests.jar'`" \ + * "org.apache.spark.sql.execution.datasources.*" * }}} */ object Benchmarks { def main(args: Array[String]): Unit = { + var isBenchmarkFound = false ClassPath.from( Thread.currentThread.getContextClassLoader ).getTopLevelClassesRecursive("org.apache.spark").asScala.foreach { info => @@ -63,11 +78,15 @@ object Benchmarks { lazy val runBenchmark = clazz.getMethod("main", classOf[Array[String]]) // isAssignableFrom seems not working with the reflected class from Guava's // getTopLevelClassesRecursive. + val matcher = args.headOption.map(pattern => + FileSystems.getDefault.getPathMatcher(s"glob:$pattern")) if ( info.getName.endsWith("Benchmark") && + matcher.forall(_.matches(Paths.get(info.getName))) && Try(runBenchmark).isSuccess && // Does this has a main method? !Modifier.isAbstract(clazz.getModifiers) // Is this a regular class? ) { + isBenchmarkFound = true val targetDirOrProjDir = new File(clazz.getProtectionDomain.getCodeSource.getLocation.toURI) .getParentFile.getParentFile @@ -78,8 +97,12 @@ object Benchmarks { // Maven build targetDirOrProjDir.getCanonicalPath } + // Force GC to minimize the side effect. + System.gc() runBenchmark.invoke(null, Array(projDir)) } } + + if (!isBenchmarkFound) throw new RuntimeException("No benchmark found to run.") } } diff --git a/core/src/test/scala/org/apache/spark/rdd/CoalescedRDDBenchmark.scala b/core/src/test/scala/org/apache/spark/rdd/CoalescedRDDBenchmark.scala index f62c561ec550..b622e0b1d6e1 100644 --- a/core/src/test/scala/org/apache/spark/rdd/CoalescedRDDBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/rdd/CoalescedRDDBenchmark.scala @@ -78,4 +78,10 @@ object CoalescedRDDBenchmark extends BenchmarkBase { coalescedRDD(numIters) } } + + override def afterAll(): Unit = { + if (sc != null) { + sc.stop() + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala index 8962e923cccf..e7f83cb7eb4b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala @@ -57,6 +57,7 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark extends BenchmarkBase { TaskContext.setTaskContext(taskContext) f sc.stop() + TaskContext.unset() } private def testRows(numRows: Int): Seq[UnsafeRow] = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SqlBasedBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SqlBasedBenchmark.scala index 98abe8daac67..f84172278bef 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SqlBasedBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SqlBasedBenchmark.scala @@ -89,4 +89,8 @@ trait SqlBasedBenchmark extends BenchmarkBase with SQLHelper { schema } + + override def afterAll(): Unit = { + spark.stop() + } } From 3df045fc761a4d4545189a0c551afe4babb3aad9 Mon Sep 17 00:00:00 2001 From: HyukjinKwon Date: Thu, 1 Apr 2021 00:45:37 +0900 Subject: [PATCH 2/8] Address Yuming's comment --- core/src/test/scala/org/apache/spark/benchmark/Benchmarks.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/benchmark/Benchmarks.scala b/core/src/test/scala/org/apache/spark/benchmark/Benchmarks.scala index 8fb9ca4a6a35..8466ffef3942 100644 --- a/core/src/test/scala/org/apache/spark/benchmark/Benchmarks.scala +++ b/core/src/test/scala/org/apache/spark/benchmark/Benchmarks.scala @@ -82,6 +82,8 @@ object Benchmarks { FileSystems.getDefault.getPathMatcher(s"glob:$pattern")) if ( info.getName.endsWith("Benchmark") && + // TPCDSQueryBenchmark requires setup and arguments. Exclude it for now. + !info.getName.endsWith("TPCDSQueryBenchmark") && matcher.forall(_.matches(Paths.get(info.getName))) && Try(runBenchmark).isSuccess && // Does this has a main method? !Modifier.isAbstract(clazz.getModifiers) // Is this a regular class? From fb3f54a58993a780cf765feedf60b86d15f163dd Mon Sep 17 00:00:00 2001 From: HyukjinKwon Date: Thu, 1 Apr 2021 12:02:23 +0900 Subject: [PATCH 3/8] Address comments, cleanup and workaround JDK11 issue --- .../spark/benchmark/BenchmarkBase.scala | 2 +- .../apache/spark/benchmark/Benchmarks.scala | 53 ++++++++++++------- 2 files changed, 36 insertions(+), 19 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/benchmark/BenchmarkBase.scala b/core/src/test/scala/org/apache/spark/benchmark/BenchmarkBase.scala index 485120e06e5f..cce58c35f90e 100644 --- a/core/src/test/scala/org/apache/spark/benchmark/BenchmarkBase.scala +++ b/core/src/test/scala/org/apache/spark/benchmark/BenchmarkBase.scala @@ -48,7 +48,7 @@ abstract class BenchmarkBase { val jdkString = if (version > 8) s"-jdk$version" else "" val resultFileName = s"${this.getClass.getSimpleName.replace("$", "")}$jdkString$suffix-results.txt" - val prefix = args.headOption.map(_ + "/").getOrElse("") + val prefix = Benchmarks.currentProjectRoot.map(_ + "/").getOrElse("") val file = new File(s"${prefix}benchmarks/$resultFileName") if (!file.exists()) { file.createNewFile() diff --git a/core/src/test/scala/org/apache/spark/benchmark/Benchmarks.scala b/core/src/test/scala/org/apache/spark/benchmark/Benchmarks.scala index 8466ffef3942..c9847cdf5918 100644 --- a/core/src/test/scala/org/apache/spark/benchmark/Benchmarks.scala +++ b/core/src/test/scala/org/apache/spark/benchmark/Benchmarks.scala @@ -31,12 +31,13 @@ import com.google.common.reflect.ClassPath * * {{{ * 1. with spark-submit - * bin/spark-submit --class --jars - * + * bin/spark-submit --class + * --jars , + * * 2. generate result: - * SPARK_GENERATE_BENCHMARK_FILES=1 bin/spark-submit --class --jars - * - * + * SPARK_GENERATE_BENCHMARK_FILES=1 bin/spark-submit --class + * --jars , + * * Results will be written to all corresponding files under "benchmarks/". * Notice that it detects the sub-project's directories from jar's paths so the provided jars * should be properly placed under target (Maven build) or target/scala-* (SBT) when you @@ -69,22 +70,33 @@ import com.google.common.reflect.ClassPath */ object Benchmarks { + var currentProjectRoot: Option[String] = None + def main(args: Array[String]): Unit = { var isBenchmarkFound = false - ClassPath.from( + val benchmarkClasses = ClassPath.from( Thread.currentThread.getContextClassLoader - ).getTopLevelClassesRecursive("org.apache.spark").asScala.foreach { info => + ).getTopLevelClassesRecursive("org.apache.spark").asScala.toArray + + // TODO(SPARK-34929): In JDK 11, MapStatusesSerDeserBenchmark(being started failed) seems + // affecting other benchmark cases with growing the size of task. + val reorderedBenchmarkClasses = + benchmarkClasses.filterNot(_.getName.endsWith("MapStatusesSerDeserBenchmark")) ++ + benchmarkClasses.find(_.getName.endsWith("MapStatusesSerDeserBenchmark")) + + val matcher = FileSystems.getDefault.getPathMatcher(s"glob:${args.head}") + + reorderedBenchmarkClasses.foreach { info => lazy val clazz = info.load lazy val runBenchmark = clazz.getMethod("main", classOf[Array[String]]) // isAssignableFrom seems not working with the reflected class from Guava's // getTopLevelClassesRecursive. - val matcher = args.headOption.map(pattern => - FileSystems.getDefault.getPathMatcher(s"glob:$pattern")) + require(args.length > 0, "Benchmark class to run should be specified.") if ( info.getName.endsWith("Benchmark") && - // TPCDSQueryBenchmark requires setup and arguments. Exclude it for now. + // TODO(SPARK-34927): Support TPCDSQueryBenchmark in Benchmarks !info.getName.endsWith("TPCDSQueryBenchmark") && - matcher.forall(_.matches(Paths.get(info.getName))) && + matcher.matches(Paths.get(info.getName)) && Try(runBenchmark).isSuccess && // Does this has a main method? !Modifier.isAbstract(clazz.getModifiers) // Is this a regular class? ) { @@ -92,16 +104,21 @@ object Benchmarks { val targetDirOrProjDir = new File(clazz.getProtectionDomain.getCodeSource.getLocation.toURI) .getParentFile.getParentFile - val projDir = if (targetDirOrProjDir.getName == "target") { - // SBT build - targetDirOrProjDir.getParentFile.getCanonicalPath - } else { - // Maven build - targetDirOrProjDir.getCanonicalPath + + // The root path to be referred in each benchmark. + currentProjectRoot = Some { + if (targetDirOrProjDir.getName == "target") { + // SBT build + targetDirOrProjDir.getParentFile.getCanonicalPath + } else { + // Maven build + targetDirOrProjDir.getCanonicalPath + } } + // Force GC to minimize the side effect. System.gc() - runBenchmark.invoke(null, Array(projDir)) + runBenchmark.invoke(null, args.tail.toArray) } } From 0978310711cd410a2a25716e89ea727eede9cbd9 Mon Sep 17 00:00:00 2001 From: HyukjinKwon Date: Thu, 1 Apr 2021 17:07:56 +0900 Subject: [PATCH 4/8] Fix SubExprEliminationBenchmark --- .../scala/org/apache/spark/benchmark/Benchmarks.scala | 9 +-------- .../sql/execution/SubExprEliminationBenchmark.scala | 4 ++-- 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/benchmark/Benchmarks.scala b/core/src/test/scala/org/apache/spark/benchmark/Benchmarks.scala index c9847cdf5918..e090435356c2 100644 --- a/core/src/test/scala/org/apache/spark/benchmark/Benchmarks.scala +++ b/core/src/test/scala/org/apache/spark/benchmark/Benchmarks.scala @@ -77,16 +77,9 @@ object Benchmarks { val benchmarkClasses = ClassPath.from( Thread.currentThread.getContextClassLoader ).getTopLevelClassesRecursive("org.apache.spark").asScala.toArray - - // TODO(SPARK-34929): In JDK 11, MapStatusesSerDeserBenchmark(being started failed) seems - // affecting other benchmark cases with growing the size of task. - val reorderedBenchmarkClasses = - benchmarkClasses.filterNot(_.getName.endsWith("MapStatusesSerDeserBenchmark")) ++ - benchmarkClasses.find(_.getName.endsWith("MapStatusesSerDeserBenchmark")) - val matcher = FileSystems.getDefault.getPathMatcher(s"glob:${args.head}") - reorderedBenchmarkClasses.foreach { info => + benchmarkClasses.foreach { info => lazy val clazz = info.load lazy val runBenchmark = clazz.getMethod("main", classOf[Array[String]]) // isAssignableFrom seems not working with the reflected class from Guava's diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SubExprEliminationBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SubExprEliminationBenchmark.scala index 0ed0126add7a..c025670fb895 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SubExprEliminationBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SubExprEliminationBenchmark.scala @@ -45,7 +45,7 @@ object SubExprEliminationBenchmark extends SqlBasedBenchmark { withTempPath { path => prepareDataInfo(benchmark) - val numCols = 1000 + val numCols = 500 val schema = writeWideRow(path.getAbsolutePath, rowsNum, numCols) val cols = (0 until numCols).map { idx => @@ -84,7 +84,7 @@ object SubExprEliminationBenchmark extends SqlBasedBenchmark { withTempPath { path => prepareDataInfo(benchmark) - val numCols = 1000 + val numCols = 500 val schema = writeWideRow(path.getAbsolutePath, rowsNum, numCols) val predicate = (0 until numCols).map { idx => From ecf7a39621c41091acfcfe88af262d10d4df87e6 Mon Sep 17 00:00:00 2001 From: HyukjinKwon Date: Fri, 2 Apr 2021 11:43:59 +0900 Subject: [PATCH 5/8] Fix benchmark tests and add failfast mode --- .github/workflows/benchmark.yml | 7 +++++++ .../org/apache/spark/benchmark/Benchmarks.scala | 15 +++++++++++++-- .../execution/benchmark/ExtractBenchmark.scala | 5 +++-- 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index f3045e7b2c76..6aed60e3615c 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -11,12 +11,18 @@ on: description: 'JDK version: 8 or 11' required: true default: '8' + failfast: + description: 'Failfast: true or false' + required: true + default: 'true' jobs: benchmark: name: "Run benchmarks: ${{ github.event.inputs.class }} (JDK ${{ github.event.inputs.jdk }})" # Ubuntu 20.04 is the latest LTS. The next LTS is 22.04. runs-on: ubuntu-20.04 + env: + SPARK_BENCHMARK_FAILFAST: ${{ github.event.inputs.failfast }} steps: - name: Checkout Spark repository uses: actions/checkout@v2 @@ -59,6 +65,7 @@ jobs: "${{ github.event.inputs.class }}" # To keep the directory structure and file permissions, tar them # See also https://github.com/actions/upload-artifact#maintaining-file-permissions-and-case-sensitive-files + echo "Preparing the benchmark results:" tar -cvf benchmark-results-${{ github.event.inputs.jdk }}.tar `git diff --name-only` - name: Upload benchmark results uses: actions/upload-artifact@v2 diff --git a/core/src/test/scala/org/apache/spark/benchmark/Benchmarks.scala b/core/src/test/scala/org/apache/spark/benchmark/Benchmarks.scala index e090435356c2..bc8b8dd2ee87 100644 --- a/core/src/test/scala/org/apache/spark/benchmark/Benchmarks.scala +++ b/core/src/test/scala/org/apache/spark/benchmark/Benchmarks.scala @@ -19,6 +19,7 @@ package org.apache.spark.benchmark import java.io.File import java.lang.reflect.Modifier import java.nio.file.{FileSystems, Paths} +import java.util.Locale import scala.collection.JavaConverters._ import scala.util.Try @@ -44,7 +45,7 @@ import com.google.common.reflect.ClassPath * generate the files. * }}} * - * In Mac, you can use a command as below to find all the test jars. + * You can use a command as below to find all the test jars. * Make sure to do not select duplicated jars created by different versions of builds or tools. * {{{ * find . -name '*-SNAPSHOT-tests.jar' | paste -sd ',' - @@ -73,6 +74,8 @@ object Benchmarks { var currentProjectRoot: Option[String] = None def main(args: Array[String]): Unit = { + val isFailFast = sys.env.get( + "SPARK_BENCHMARK_FAILFAST").map(_.toLowerCase(Locale.ROOT).trim.toBoolean).getOrElse(true) var isBenchmarkFound = false val benchmarkClasses = ClassPath.from( Thread.currentThread.getContextClassLoader @@ -111,7 +114,15 @@ object Benchmarks { // Force GC to minimize the side effect. System.gc() - runBenchmark.invoke(null, args.tail.toArray) + try { + runBenchmark.invoke(null, args.tail.toArray) + } catch { + case e: Throwable if !isFailFast => + // scalastyle:off println + println(s"${clazz.getName} failed with the exception below:") + // scalastyle:on println + e.printStackTrace() + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ExtractBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ExtractBenchmark.scala index 6af20e8696aa..ea9349c579bd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ExtractBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ExtractBenchmark.scala @@ -94,8 +94,9 @@ object ExtractBenchmark extends SqlBasedBenchmark { val intervalFields = Seq("YEAR", "MONTH", "DAY", "HOUR", "MINUTE", "SECOND") val settings = Map( "timestamp" -> datetimeFields, - "date" -> datetimeFields, - "interval" -> intervalFields) + "date" -> datetimeFields) + // TODO(SPARK-34938): Recover the benchmark of interval case + // "interval" -> intervalFields) for {(dataType, fields) <- settings; func <- Seq("extract", "date_part")} { From 60d3f0e3bbe00d7db460db90a9cb1e1bda1efdb6 Mon Sep 17 00:00:00 2001 From: HyukjinKwon Date: Fri, 2 Apr 2021 15:47:44 +0900 Subject: [PATCH 6/8] Remove changes in ExtractBenchmark --- .../spark/sql/execution/benchmark/ExtractBenchmark.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ExtractBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ExtractBenchmark.scala index ea9349c579bd..6af20e8696aa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ExtractBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ExtractBenchmark.scala @@ -94,9 +94,8 @@ object ExtractBenchmark extends SqlBasedBenchmark { val intervalFields = Seq("YEAR", "MONTH", "DAY", "HOUR", "MINUTE", "SECOND") val settings = Map( "timestamp" -> datetimeFields, - "date" -> datetimeFields) - // TODO(SPARK-34938): Recover the benchmark of interval case - // "interval" -> intervalFields) + "date" -> datetimeFields, + "interval" -> intervalFields) for {(dataType, fields) <- settings; func <- Seq("extract", "date_part")} { From 5eefec992b42689a56b1f07de0e5fddfb95aed2b Mon Sep 17 00:00:00 2001 From: HyukjinKwon Date: Fri, 2 Apr 2021 20:37:02 +0900 Subject: [PATCH 7/8] Support to split jobs --- .github/workflows/benchmark.yml | 28 +++++++++- .../apache/spark/benchmark/Benchmarks.scala | 56 +++++++++++-------- 2 files changed, 59 insertions(+), 25 deletions(-) diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index 6aed60e3615c..fe2469ef0c21 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -15,14 +15,38 @@ on: description: 'Failfast: true or false' required: true default: 'true' + num-splits: + description: 'Number of job splits' + required: true + default: '1' jobs: + matrix-gen: + name: Generate matrix for job splits + runs-on: ubuntu-20.04 + outputs: + matrix: ${{ steps.set-matrix.outputs.matrix }} + env: + SPARK_BENCHMARK_NUM_SPLITS: ${{ github.event.inputs.num-splits }} + steps: + - name: Generate matrix + id: set-matrix + run: echo "::set-output name=matrix::["`seq -s, 1 $SPARK_BENCHMARK_NUM_SPLITS`"]" + benchmark: - name: "Run benchmarks: ${{ github.event.inputs.class }} (JDK ${{ github.event.inputs.jdk }})" + name: "Run benchmarks: ${{ github.event.inputs.class }} (JDK ${{ github.event.inputs.jdk }}, ${{ matrix.split }} out of ${{ github.event.inputs.num-splits }} splits)" + needs: matrix-gen # Ubuntu 20.04 is the latest LTS. The next LTS is 22.04. runs-on: ubuntu-20.04 + strategy: + fail-fast: false + matrix: + split: ${{fromJSON(needs.matrix-gen.outputs.matrix)}} env: SPARK_BENCHMARK_FAILFAST: ${{ github.event.inputs.failfast }} + SPARK_BENCHMARK_NUM_SPLITS: ${{ github.event.inputs.num-splits }} + SPARK_BENCHMARK_CUR_SPLIT: ${{ matrix.split }} + SPARK_GENERATE_BENCHMARK_FILES: 1 steps: - name: Checkout Spark repository uses: actions/checkout@v2 @@ -58,7 +82,7 @@ jobs: cp conf/log4j.properties.template conf/log4j.properties sed -i 's/log4j.rootCategory=INFO, console/log4j.rootCategory=WARN, console/g' conf/log4j.properties # In benchmark, we use local as master so set driver memory only. Note that GitHub Actions has 7 GB memory limit. - SPARK_GENERATE_BENCHMARK_FILES=1 bin/spark-submit \ + bin/spark-submit \ --driver-memory 6g --class org.apache.spark.benchmark.Benchmarks \ --jars "`find . -name '*-SNAPSHOT-tests.jar' -o -name '*avro*-SNAPSHOT.jar' | paste -sd ',' -`" \ "`find . -name 'spark-core*-SNAPSHOT-tests.jar'`" \ diff --git a/core/src/test/scala/org/apache/spark/benchmark/Benchmarks.scala b/core/src/test/scala/org/apache/spark/benchmark/Benchmarks.scala index bc8b8dd2ee87..5d60bd374c20 100644 --- a/core/src/test/scala/org/apache/spark/benchmark/Benchmarks.scala +++ b/core/src/test/scala/org/apache/spark/benchmark/Benchmarks.scala @@ -76,6 +76,12 @@ object Benchmarks { def main(args: Array[String]): Unit = { val isFailFast = sys.env.get( "SPARK_BENCHMARK_FAILFAST").map(_.toLowerCase(Locale.ROOT).trim.toBoolean).getOrElse(true) + val numOfSplits = sys.env.get( + "SPARK_BENCHMARK_NUM_SPLITS").map(_.toLowerCase(Locale.ROOT).trim.toInt).getOrElse(1) + val currentSplit = sys.env.get( + "SPARK_BENCHMARK_CUR_SPLIT").map(_.toLowerCase(Locale.ROOT).trim.toInt - 1).getOrElse(0) + var numBenchmark = 0 + var isBenchmarkFound = false val benchmarkClasses = ClassPath.from( Thread.currentThread.getContextClassLoader @@ -96,32 +102,36 @@ object Benchmarks { Try(runBenchmark).isSuccess && // Does this has a main method? !Modifier.isAbstract(clazz.getModifiers) // Is this a regular class? ) { - isBenchmarkFound = true - val targetDirOrProjDir = - new File(clazz.getProtectionDomain.getCodeSource.getLocation.toURI) - .getParentFile.getParentFile + numBenchmark += 1 + if (numBenchmark % numOfSplits == currentSplit) { + isBenchmarkFound = true + + val targetDirOrProjDir = + new File(clazz.getProtectionDomain.getCodeSource.getLocation.toURI) + .getParentFile.getParentFile - // The root path to be referred in each benchmark. - currentProjectRoot = Some { - if (targetDirOrProjDir.getName == "target") { - // SBT build - targetDirOrProjDir.getParentFile.getCanonicalPath - } else { - // Maven build - targetDirOrProjDir.getCanonicalPath + // The root path to be referred in each benchmark. + currentProjectRoot = Some { + if (targetDirOrProjDir.getName == "target") { + // SBT build + targetDirOrProjDir.getParentFile.getCanonicalPath + } else { + // Maven build + targetDirOrProjDir.getCanonicalPath + } } - } - // Force GC to minimize the side effect. - System.gc() - try { - runBenchmark.invoke(null, args.tail.toArray) - } catch { - case e: Throwable if !isFailFast => - // scalastyle:off println - println(s"${clazz.getName} failed with the exception below:") - // scalastyle:on println - e.printStackTrace() + // Force GC to minimize the side effect. + System.gc() + try { + runBenchmark.invoke(null, args.tail.toArray) + } catch { + case e: Throwable if !isFailFast => + // scalastyle:off println + println(s"${clazz.getName} failed with the exception below:") + // scalastyle:on println + e.printStackTrace() + } } } } From e6beeb59e6aaada24b5b4faaa20410c76e3c363f Mon Sep 17 00:00:00 2001 From: HyukjinKwon Date: Fri, 2 Apr 2021 21:02:18 +0900 Subject: [PATCH 8/8] Add matrix in the output artifact name --- .github/workflows/benchmark.yml | 2 +- .../apache/spark/benchmark/Benchmarks.scala | 3 +++ .../datasources/json/JsonBenchmark.scala | 20 +++++++++---------- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index fe2469ef0c21..918e59350dac 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -94,6 +94,6 @@ jobs: - name: Upload benchmark results uses: actions/upload-artifact@v2 with: - name: benchmark-results-${{ github.event.inputs.jdk }} + name: benchmark-results-${{ github.event.inputs.jdk }}-${{ matrix.split }} path: benchmark-results-${{ github.event.inputs.jdk }}.tar diff --git a/core/src/test/scala/org/apache/spark/benchmark/Benchmarks.scala b/core/src/test/scala/org/apache/spark/benchmark/Benchmarks.scala index 5d60bd374c20..2bb70bc75f6b 100644 --- a/core/src/test/scala/org/apache/spark/benchmark/Benchmarks.scala +++ b/core/src/test/scala/org/apache/spark/benchmark/Benchmarks.scala @@ -121,6 +121,9 @@ object Benchmarks { } } + // scalastyle:off println + println(s"Running ${clazz.getName}:") + // scalastyle:on println // Force GC to minimize the side effect. System.gc() try { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala index ffe8e66f3368..e4f6ccaa9a62 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala @@ -519,16 +519,16 @@ object JsonBenchmark extends SqlBasedBenchmark { override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { val numIters = 3 runBenchmark("Benchmark for performance of JSON parsing") { - schemaInferring(100 * 1000 * 1000, numIters) - countShortColumn(100 * 1000 * 1000, numIters) - countWideColumn(10 * 1000 * 1000, numIters) - countWideRow(500 * 1000, numIters) - selectSubsetOfColumns(10 * 1000 * 1000, numIters) - jsonParserCreation(10 * 1000 * 1000, numIters) - jsonFunctions(10 * 1000 * 1000, numIters) - jsonInDS(50 * 1000 * 1000, numIters) - jsonInFile(50 * 1000 * 1000, numIters) - datetimeBenchmark(rowsNum = 10 * 1000 * 1000, numIters) + schemaInferring(5 * 1000 * 1000, numIters) + countShortColumn(5 * 1000 * 1000, numIters) + countWideColumn(1000 * 1000, numIters) + countWideRow(50 * 1000, numIters) + selectSubsetOfColumns(1000 * 1000, numIters) + jsonParserCreation(1000 * 1000, numIters) + jsonFunctions(1000 * 1000, numIters) + jsonInDS(5 * 1000 * 1000, numIters) + jsonInFile(5 * 1000 * 1000, numIters) + datetimeBenchmark(rowsNum = 1000 * 1000, numIters) // Benchmark pushdown filters that refer to top-level columns. // TODO (SPARK-32325): Add benchmarks for filters with nested column attributes. filtersPushdownBenchmark(rowsNum = 100 * 1000, numIters)