diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml new file mode 100644 index 000000000000..918e59350dac --- /dev/null +++ b/.github/workflows/benchmark.yml @@ -0,0 +1,99 @@ +name: Run benchmarks + +on: + workflow_dispatch: + inputs: + class: + description: 'Benchmark class' + required: true + default: '*' + jdk: + description: 'JDK version: 8 or 11' + required: true + default: '8' + failfast: + description: 'Failfast: true or false' + required: true + default: 'true' + num-splits: + description: 'Number of job splits' + required: true + default: '1' + +jobs: + matrix-gen: + name: Generate matrix for job splits + runs-on: ubuntu-20.04 + outputs: + matrix: ${{ steps.set-matrix.outputs.matrix }} + env: + SPARK_BENCHMARK_NUM_SPLITS: ${{ github.event.inputs.num-splits }} + steps: + - name: Generate matrix + id: set-matrix + run: echo "::set-output name=matrix::["`seq -s, 1 $SPARK_BENCHMARK_NUM_SPLITS`"]" + + benchmark: + name: "Run benchmarks: ${{ github.event.inputs.class }} (JDK ${{ github.event.inputs.jdk }}, ${{ matrix.split }} out of ${{ github.event.inputs.num-splits }} splits)" + needs: matrix-gen + # Ubuntu 20.04 is the latest LTS. The next LTS is 22.04. + runs-on: ubuntu-20.04 + strategy: + fail-fast: false + matrix: + split: ${{fromJSON(needs.matrix-gen.outputs.matrix)}} + env: + SPARK_BENCHMARK_FAILFAST: ${{ github.event.inputs.failfast }} + SPARK_BENCHMARK_NUM_SPLITS: ${{ github.event.inputs.num-splits }} + SPARK_BENCHMARK_CUR_SPLIT: ${{ matrix.split }} + SPARK_GENERATE_BENCHMARK_FILES: 1 + steps: + - name: Checkout Spark repository + uses: actions/checkout@v2 + # In order to get diff files + with: + fetch-depth: 0 + - name: Cache Scala, SBT and Maven + uses: actions/cache@v2 + with: + path: | + build/apache-maven-* + build/scala-* + build/*.jar + ~/.sbt + key: build-${{ hashFiles('**/pom.xml', 'project/build.properties', 'build/mvn', 'build/sbt', 'build/sbt-launch-lib.bash', 'build/spark-build-info') }} + restore-keys: | + build- + - name: Cache Coursier local repository + uses: actions/cache@v2 + with: + path: ~/.cache/coursier + key: benchmark-coursier-${{ github.event.inputs.jdk }}-${{ hashFiles('**/pom.xml', '**/plugins.sbt') }} + restore-keys: | + benchmark-coursier-${{ github.event.inputs.jdk }} + - name: Install Java ${{ github.event.inputs.jdk }} + uses: actions/setup-java@v1 + with: + java-version: ${{ github.event.inputs.jdk }} + - name: Run benchmarks + run: | + ./build/sbt -Pyarn -Pmesos -Pkubernetes -Phive -Phive-thriftserver -Phadoop-cloud -Pkinesis-asl -Pspark-ganglia-lgpl test:package + # Make less noisy + cp conf/log4j.properties.template conf/log4j.properties + sed -i 's/log4j.rootCategory=INFO, console/log4j.rootCategory=WARN, console/g' conf/log4j.properties + # In benchmark, we use local as master so set driver memory only. Note that GitHub Actions has 7 GB memory limit. + bin/spark-submit \ + --driver-memory 6g --class org.apache.spark.benchmark.Benchmarks \ + --jars "`find . -name '*-SNAPSHOT-tests.jar' -o -name '*avro*-SNAPSHOT.jar' | paste -sd ',' -`" \ + "`find . -name 'spark-core*-SNAPSHOT-tests.jar'`" \ + "${{ github.event.inputs.class }}" + # To keep the directory structure and file permissions, tar them + # See also https://github.com/actions/upload-artifact#maintaining-file-permissions-and-case-sensitive-files + echo "Preparing the benchmark results:" + tar -cvf benchmark-results-${{ github.event.inputs.jdk }}.tar `git diff --name-only` + - name: Upload benchmark results + uses: actions/upload-artifact@v2 + with: + name: benchmark-results-${{ github.event.inputs.jdk }}-${{ matrix.split }} + path: benchmark-results-${{ github.event.inputs.jdk }}.tar + diff --git a/core/src/test/scala/org/apache/spark/benchmark/BenchmarkBase.scala b/core/src/test/scala/org/apache/spark/benchmark/BenchmarkBase.scala index 485120e06e5f..cce58c35f90e 100644 --- a/core/src/test/scala/org/apache/spark/benchmark/BenchmarkBase.scala +++ b/core/src/test/scala/org/apache/spark/benchmark/BenchmarkBase.scala @@ -48,7 +48,7 @@ abstract class BenchmarkBase { val jdkString = if (version > 8) s"-jdk$version" else "" val resultFileName = s"${this.getClass.getSimpleName.replace("$", "")}$jdkString$suffix-results.txt" - val prefix = args.headOption.map(_ + "/").getOrElse("") + val prefix = Benchmarks.currentProjectRoot.map(_ + "/").getOrElse("") val file = new File(s"${prefix}benchmarks/$resultFileName") if (!file.exists()) { file.createNewFile() diff --git a/core/src/test/scala/org/apache/spark/benchmark/Benchmarks.scala b/core/src/test/scala/org/apache/spark/benchmark/Benchmarks.scala index 101f6f8df8a8..2bb70bc75f6b 100644 --- a/core/src/test/scala/org/apache/spark/benchmark/Benchmarks.scala +++ b/core/src/test/scala/org/apache/spark/benchmark/Benchmarks.scala @@ -18,6 +18,8 @@ package org.apache.spark.benchmark import java.io.File import java.lang.reflect.Modifier +import java.nio.file.{FileSystems, Paths} +import java.util.Locale import scala.collection.JavaConverters._ import scala.util.Try @@ -30,56 +32,113 @@ import com.google.common.reflect.ClassPath * * {{{ * 1. with spark-submit - * bin/spark-submit --class --jars + * bin/spark-submit --class + * --jars , + * * 2. generate result: - * SPARK_GENERATE_BENCHMARK_FILES=1 bin/spark-submit --class --jars - * + * SPARK_GENERATE_BENCHMARK_FILES=1 bin/spark-submit --class + * --jars , + * * Results will be written to all corresponding files under "benchmarks/". * Notice that it detects the sub-project's directories from jar's paths so the provided jars * should be properly placed under target (Maven build) or target/scala-* (SBT) when you * generate the files. * }}} * - * In Mac, you can use a command as below to find all the test jars. + * You can use a command as below to find all the test jars. + * Make sure to do not select duplicated jars created by different versions of builds or tools. * {{{ - * find . -name "*3.2.0-SNAPSHOT-tests.jar" | paste -sd ',' - + * find . -name '*-SNAPSHOT-tests.jar' | paste -sd ',' - * }}} * - * Full command example: + * The example below runs all benchmarks and generates the results: * {{{ * SPARK_GENERATE_BENCHMARK_FILES=1 bin/spark-submit --class \ * org.apache.spark.benchmark.Benchmarks --jars \ - * "`find . -name "*3.2.0-SNAPSHOT-tests.jar" | paste -sd ',' -`" \ - * ./core/target/scala-2.12/spark-core_2.12-3.2.0-SNAPSHOT-tests.jar + * "`find . -name '*-SNAPSHOT-tests.jar' -o -name '*avro*-SNAPSHOT.jar' | paste -sd ',' -`" \ + * "`find . -name 'spark-core*-SNAPSHOT-tests.jar'`" \ + * "*" + * }}} + * + * The example below runs all benchmarks under "org.apache.spark.sql.execution.datasources" + * {{{ + * bin/spark-submit --class \ + * org.apache.spark.benchmark.Benchmarks --jars \ + * "`find . -name '*-SNAPSHOT-tests.jar' -o -name '*avro*-SNAPSHOT.jar' | paste -sd ',' -`" \ + * "`find . -name 'spark-core*-SNAPSHOT-tests.jar'`" \ + * "org.apache.spark.sql.execution.datasources.*" * }}} */ object Benchmarks { + var currentProjectRoot: Option[String] = None + def main(args: Array[String]): Unit = { - ClassPath.from( + val isFailFast = sys.env.get( + "SPARK_BENCHMARK_FAILFAST").map(_.toLowerCase(Locale.ROOT).trim.toBoolean).getOrElse(true) + val numOfSplits = sys.env.get( + "SPARK_BENCHMARK_NUM_SPLITS").map(_.toLowerCase(Locale.ROOT).trim.toInt).getOrElse(1) + val currentSplit = sys.env.get( + "SPARK_BENCHMARK_CUR_SPLIT").map(_.toLowerCase(Locale.ROOT).trim.toInt - 1).getOrElse(0) + var numBenchmark = 0 + + var isBenchmarkFound = false + val benchmarkClasses = ClassPath.from( Thread.currentThread.getContextClassLoader - ).getTopLevelClassesRecursive("org.apache.spark").asScala.foreach { info => + ).getTopLevelClassesRecursive("org.apache.spark").asScala.toArray + val matcher = FileSystems.getDefault.getPathMatcher(s"glob:${args.head}") + + benchmarkClasses.foreach { info => lazy val clazz = info.load lazy val runBenchmark = clazz.getMethod("main", classOf[Array[String]]) // isAssignableFrom seems not working with the reflected class from Guava's // getTopLevelClassesRecursive. + require(args.length > 0, "Benchmark class to run should be specified.") if ( info.getName.endsWith("Benchmark") && + // TODO(SPARK-34927): Support TPCDSQueryBenchmark in Benchmarks + !info.getName.endsWith("TPCDSQueryBenchmark") && + matcher.matches(Paths.get(info.getName)) && Try(runBenchmark).isSuccess && // Does this has a main method? !Modifier.isAbstract(clazz.getModifiers) // Is this a regular class? ) { - val targetDirOrProjDir = - new File(clazz.getProtectionDomain.getCodeSource.getLocation.toURI) - .getParentFile.getParentFile - val projDir = if (targetDirOrProjDir.getName == "target") { - // SBT build - targetDirOrProjDir.getParentFile.getCanonicalPath - } else { - // Maven build - targetDirOrProjDir.getCanonicalPath + numBenchmark += 1 + if (numBenchmark % numOfSplits == currentSplit) { + isBenchmarkFound = true + + val targetDirOrProjDir = + new File(clazz.getProtectionDomain.getCodeSource.getLocation.toURI) + .getParentFile.getParentFile + + // The root path to be referred in each benchmark. + currentProjectRoot = Some { + if (targetDirOrProjDir.getName == "target") { + // SBT build + targetDirOrProjDir.getParentFile.getCanonicalPath + } else { + // Maven build + targetDirOrProjDir.getCanonicalPath + } + } + + // scalastyle:off println + println(s"Running ${clazz.getName}:") + // scalastyle:on println + // Force GC to minimize the side effect. + System.gc() + try { + runBenchmark.invoke(null, args.tail.toArray) + } catch { + case e: Throwable if !isFailFast => + // scalastyle:off println + println(s"${clazz.getName} failed with the exception below:") + // scalastyle:on println + e.printStackTrace() + } } - runBenchmark.invoke(null, Array(projDir)) } } + + if (!isBenchmarkFound) throw new RuntimeException("No benchmark found to run.") } } diff --git a/core/src/test/scala/org/apache/spark/rdd/CoalescedRDDBenchmark.scala b/core/src/test/scala/org/apache/spark/rdd/CoalescedRDDBenchmark.scala index f62c561ec550..b622e0b1d6e1 100644 --- a/core/src/test/scala/org/apache/spark/rdd/CoalescedRDDBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/rdd/CoalescedRDDBenchmark.scala @@ -78,4 +78,10 @@ object CoalescedRDDBenchmark extends BenchmarkBase { coalescedRDD(numIters) } } + + override def afterAll(): Unit = { + if (sc != null) { + sc.stop() + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala index 8962e923cccf..e7f83cb7eb4b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala @@ -57,6 +57,7 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark extends BenchmarkBase { TaskContext.setTaskContext(taskContext) f sc.stop() + TaskContext.unset() } private def testRows(numRows: Int): Seq[UnsafeRow] = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SubExprEliminationBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SubExprEliminationBenchmark.scala index 0ed0126add7a..c025670fb895 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SubExprEliminationBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SubExprEliminationBenchmark.scala @@ -45,7 +45,7 @@ object SubExprEliminationBenchmark extends SqlBasedBenchmark { withTempPath { path => prepareDataInfo(benchmark) - val numCols = 1000 + val numCols = 500 val schema = writeWideRow(path.getAbsolutePath, rowsNum, numCols) val cols = (0 until numCols).map { idx => @@ -84,7 +84,7 @@ object SubExprEliminationBenchmark extends SqlBasedBenchmark { withTempPath { path => prepareDataInfo(benchmark) - val numCols = 1000 + val numCols = 500 val schema = writeWideRow(path.getAbsolutePath, rowsNum, numCols) val predicate = (0 until numCols).map { idx => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SqlBasedBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SqlBasedBenchmark.scala index 98abe8daac67..f84172278bef 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SqlBasedBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SqlBasedBenchmark.scala @@ -89,4 +89,8 @@ trait SqlBasedBenchmark extends BenchmarkBase with SQLHelper { schema } + + override def afterAll(): Unit = { + spark.stop() + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala index ffe8e66f3368..e4f6ccaa9a62 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala @@ -519,16 +519,16 @@ object JsonBenchmark extends SqlBasedBenchmark { override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { val numIters = 3 runBenchmark("Benchmark for performance of JSON parsing") { - schemaInferring(100 * 1000 * 1000, numIters) - countShortColumn(100 * 1000 * 1000, numIters) - countWideColumn(10 * 1000 * 1000, numIters) - countWideRow(500 * 1000, numIters) - selectSubsetOfColumns(10 * 1000 * 1000, numIters) - jsonParserCreation(10 * 1000 * 1000, numIters) - jsonFunctions(10 * 1000 * 1000, numIters) - jsonInDS(50 * 1000 * 1000, numIters) - jsonInFile(50 * 1000 * 1000, numIters) - datetimeBenchmark(rowsNum = 10 * 1000 * 1000, numIters) + schemaInferring(5 * 1000 * 1000, numIters) + countShortColumn(5 * 1000 * 1000, numIters) + countWideColumn(1000 * 1000, numIters) + countWideRow(50 * 1000, numIters) + selectSubsetOfColumns(1000 * 1000, numIters) + jsonParserCreation(1000 * 1000, numIters) + jsonFunctions(1000 * 1000, numIters) + jsonInDS(5 * 1000 * 1000, numIters) + jsonInFile(5 * 1000 * 1000, numIters) + datetimeBenchmark(rowsNum = 1000 * 1000, numIters) // Benchmark pushdown filters that refer to top-level columns. // TODO (SPARK-32325): Add benchmarks for filters with nested column attributes. filtersPushdownBenchmark(rowsNum = 100 * 1000, numIters)