Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions .github/workflows/benchmark.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
name: Run benchmarks

on:
workflow_dispatch:
inputs:
class:
description: 'Benchmark class'
required: true
default: '*'
jdk:
description: 'JDK version: 8 or 11'
required: true
default: '8'
failfast:
description: 'Failfast: true or false'
required: true
default: 'true'
num-splits:
description: 'Number of job splits'
required: true
default: '1'
Copy link
Member Author

@HyukjinKwon HyukjinKwon Apr 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to add this parameter because GitHub Actions' limits job's timeout as 6 hours (workflow is 72 hours), and sequential running of benchmarks takes up to 50 hours. In this way, it runs the benchmarks in parallel so I think it's okay .. although it might expose too many parameters to control.

For example, I am now running all benchmarks in 20 splits (with JDK 11) at here:

Screen Shot 2021-04-02 at 8 42 31 PM

which results in 20 jobs that runs benchmarks in parallel (hashed by 20)

Screen Shot 2021-04-02 at 8 42 43 PM


jobs:
matrix-gen:
name: Generate matrix for job splits
runs-on: ubuntu-20.04
outputs:
matrix: ${{ steps.set-matrix.outputs.matrix }}
env:
SPARK_BENCHMARK_NUM_SPLITS: ${{ github.event.inputs.num-splits }}
steps:
- name: Generate matrix
id: set-matrix
run: echo "::set-output name=matrix::["`seq -s, 1 $SPARK_BENCHMARK_NUM_SPLITS`"]"

benchmark:
name: "Run benchmarks: ${{ github.event.inputs.class }} (JDK ${{ github.event.inputs.jdk }}, ${{ matrix.split }} out of ${{ github.event.inputs.num-splits }} splits)"
needs: matrix-gen
# Ubuntu 20.04 is the latest LTS. The next LTS is 22.04.
runs-on: ubuntu-20.04
strategy:
fail-fast: false
matrix:
split: ${{fromJSON(needs.matrix-gen.outputs.matrix)}}
env:
SPARK_BENCHMARK_FAILFAST: ${{ github.event.inputs.failfast }}
SPARK_BENCHMARK_NUM_SPLITS: ${{ github.event.inputs.num-splits }}
SPARK_BENCHMARK_CUR_SPLIT: ${{ matrix.split }}
SPARK_GENERATE_BENCHMARK_FILES: 1
steps:
- name: Checkout Spark repository
uses: actions/checkout@v2
# In order to get diff files
with:
fetch-depth: 0
- name: Cache Scala, SBT and Maven
uses: actions/cache@v2
with:
path: |
build/apache-maven-*
build/scala-*
build/*.jar
~/.sbt
key: build-${{ hashFiles('**/pom.xml', 'project/build.properties', 'build/mvn', 'build/sbt', 'build/sbt-launch-lib.bash', 'build/spark-build-info') }}
restore-keys: |
build-
- name: Cache Coursier local repository
uses: actions/cache@v2
with:
path: ~/.cache/coursier
key: benchmark-coursier-${{ github.event.inputs.jdk }}-${{ hashFiles('**/pom.xml', '**/plugins.sbt') }}
restore-keys: |
benchmark-coursier-${{ github.event.inputs.jdk }}
- name: Install Java ${{ github.event.inputs.jdk }}
uses: actions/setup-java@v1
with:
java-version: ${{ github.event.inputs.jdk }}
- name: Run benchmarks
run: |
./build/sbt -Pyarn -Pmesos -Pkubernetes -Phive -Phive-thriftserver -Phadoop-cloud -Pkinesis-asl -Pspark-ganglia-lgpl test:package
# Make less noisy
cp conf/log4j.properties.template conf/log4j.properties
sed -i 's/log4j.rootCategory=INFO, console/log4j.rootCategory=WARN, console/g' conf/log4j.properties
# In benchmark, we use local as master so set driver memory only. Note that GitHub Actions has 7 GB memory limit.
bin/spark-submit \
--driver-memory 6g --class org.apache.spark.benchmark.Benchmarks \
--jars "`find . -name '*-SNAPSHOT-tests.jar' -o -name '*avro*-SNAPSHOT.jar' | paste -sd ',' -`" \
"`find . -name 'spark-core*-SNAPSHOT-tests.jar'`" \
"${{ github.event.inputs.class }}"
# To keep the directory structure and file permissions, tar them
# See also https://github.com/actions/upload-artifact#maintaining-file-permissions-and-case-sensitive-files
echo "Preparing the benchmark results:"
tar -cvf benchmark-results-${{ github.event.inputs.jdk }}.tar `git diff --name-only`
- name: Upload benchmark results
uses: actions/upload-artifact@v2
with:
name: benchmark-results-${{ github.event.inputs.jdk }}-${{ matrix.split }}
path: benchmark-results-${{ github.event.inputs.jdk }}.tar

Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ abstract class BenchmarkBase {
val jdkString = if (version > 8) s"-jdk$version" else ""
val resultFileName =
s"${this.getClass.getSimpleName.replace("$", "")}$jdkString$suffix-results.txt"
val prefix = args.headOption.map(_ + "/").getOrElse("")
val prefix = Benchmarks.currentProjectRoot.map(_ + "/").getOrElse("")
val file = new File(s"${prefix}benchmarks/$resultFileName")
if (!file.exists()) {
file.createNewFile()
Expand Down
99 changes: 79 additions & 20 deletions core/src/test/scala/org/apache/spark/benchmark/Benchmarks.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package org.apache.spark.benchmark

import java.io.File
import java.lang.reflect.Modifier
import java.nio.file.{FileSystems, Paths}
import java.util.Locale

import scala.collection.JavaConverters._
import scala.util.Try
Expand All @@ -30,56 +32,113 @@ import com.google.common.reflect.ClassPath
*
* {{{
* 1. with spark-submit
* bin/spark-submit --class <this class> --jars <all spark test jars> <spark core test jar>
* bin/spark-submit --class <this class>
* --jars <all spark test jars>,<spark external package jars>
* <spark core test jar> <glob pattern for class> <extra arguments>
* 2. generate result:
* SPARK_GENERATE_BENCHMARK_FILES=1 bin/spark-submit --class <this class> --jars
* <all spark test jars> <spark core test jar>
* SPARK_GENERATE_BENCHMARK_FILES=1 bin/spark-submit --class <this class>
* --jars <all spark test jars>,<spark external package jars>
* <spark core test jar> <glob pattern for class> <extra arguments>
* Results will be written to all corresponding files under "benchmarks/".
* Notice that it detects the sub-project's directories from jar's paths so the provided jars
* should be properly placed under target (Maven build) or target/scala-* (SBT) when you
* generate the files.
* }}}
*
* In Mac, you can use a command as below to find all the test jars.
* You can use a command as below to find all the test jars.
* Make sure to do not select duplicated jars created by different versions of builds or tools.
* {{{
* find . -name "*3.2.0-SNAPSHOT-tests.jar" | paste -sd ',' -
* find . -name '*-SNAPSHOT-tests.jar' | paste -sd ',' -
* }}}
*
* Full command example:
* The example below runs all benchmarks and generates the results:
* {{{
* SPARK_GENERATE_BENCHMARK_FILES=1 bin/spark-submit --class \
* org.apache.spark.benchmark.Benchmarks --jars \
* "`find . -name "*3.2.0-SNAPSHOT-tests.jar" | paste -sd ',' -`" \
* ./core/target/scala-2.12/spark-core_2.12-3.2.0-SNAPSHOT-tests.jar
* "`find . -name '*-SNAPSHOT-tests.jar' -o -name '*avro*-SNAPSHOT.jar' | paste -sd ',' -`" \
* "`find . -name 'spark-core*-SNAPSHOT-tests.jar'`" \
* "*"
* }}}
*
* The example below runs all benchmarks under "org.apache.spark.sql.execution.datasources"
* {{{
* bin/spark-submit --class \
* org.apache.spark.benchmark.Benchmarks --jars \
* "`find . -name '*-SNAPSHOT-tests.jar' -o -name '*avro*-SNAPSHOT.jar' | paste -sd ',' -`" \
* "`find . -name 'spark-core*-SNAPSHOT-tests.jar'`" \
* "org.apache.spark.sql.execution.datasources.*"
* }}}
*/

object Benchmarks {
var currentProjectRoot: Option[String] = None

def main(args: Array[String]): Unit = {
ClassPath.from(
val isFailFast = sys.env.get(
"SPARK_BENCHMARK_FAILFAST").map(_.toLowerCase(Locale.ROOT).trim.toBoolean).getOrElse(true)
val numOfSplits = sys.env.get(
"SPARK_BENCHMARK_NUM_SPLITS").map(_.toLowerCase(Locale.ROOT).trim.toInt).getOrElse(1)
val currentSplit = sys.env.get(
"SPARK_BENCHMARK_CUR_SPLIT").map(_.toLowerCase(Locale.ROOT).trim.toInt - 1).getOrElse(0)
var numBenchmark = 0

var isBenchmarkFound = false
val benchmarkClasses = ClassPath.from(
Thread.currentThread.getContextClassLoader
).getTopLevelClassesRecursive("org.apache.spark").asScala.foreach { info =>
).getTopLevelClassesRecursive("org.apache.spark").asScala.toArray
val matcher = FileSystems.getDefault.getPathMatcher(s"glob:${args.head}")

benchmarkClasses.foreach { info =>
lazy val clazz = info.load
lazy val runBenchmark = clazz.getMethod("main", classOf[Array[String]])
// isAssignableFrom seems not working with the reflected class from Guava's
// getTopLevelClassesRecursive.
require(args.length > 0, "Benchmark class to run should be specified.")
if (
info.getName.endsWith("Benchmark") &&
// TODO(SPARK-34927): Support TPCDSQueryBenchmark in Benchmarks
!info.getName.endsWith("TPCDSQueryBenchmark") &&
matcher.matches(Paths.get(info.getName)) &&
Try(runBenchmark).isSuccess && // Does this has a main method?
!Modifier.isAbstract(clazz.getModifiers) // Is this a regular class?
) {
val targetDirOrProjDir =
new File(clazz.getProtectionDomain.getCodeSource.getLocation.toURI)
.getParentFile.getParentFile
val projDir = if (targetDirOrProjDir.getName == "target") {
// SBT build
targetDirOrProjDir.getParentFile.getCanonicalPath
} else {
// Maven build
targetDirOrProjDir.getCanonicalPath
numBenchmark += 1
if (numBenchmark % numOfSplits == currentSplit) {
isBenchmarkFound = true

val targetDirOrProjDir =
new File(clazz.getProtectionDomain.getCodeSource.getLocation.toURI)
.getParentFile.getParentFile

// The root path to be referred in each benchmark.
currentProjectRoot = Some {
if (targetDirOrProjDir.getName == "target") {
// SBT build
targetDirOrProjDir.getParentFile.getCanonicalPath
} else {
// Maven build
targetDirOrProjDir.getCanonicalPath
}
}

// scalastyle:off println
println(s"Running ${clazz.getName}:")
// scalastyle:on println
// Force GC to minimize the side effect.
System.gc()
try {
runBenchmark.invoke(null, args.tail.toArray)
} catch {
case e: Throwable if !isFailFast =>
// scalastyle:off println
println(s"${clazz.getName} failed with the exception below:")
// scalastyle:on println
e.printStackTrace()
}
}
runBenchmark.invoke(null, Array(projDir))
}
}

if (!isBenchmarkFound) throw new RuntimeException("No benchmark found to run.")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,10 @@ object CoalescedRDDBenchmark extends BenchmarkBase {
coalescedRDD(numIters)
}
}

override def afterAll(): Unit = {
if (sc != null) {
sc.stop()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark extends BenchmarkBase {
TaskContext.setTaskContext(taskContext)
f
sc.stop()
TaskContext.unset()
}

private def testRows(numRows: Int): Seq[UnsafeRow] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ object SubExprEliminationBenchmark extends SqlBasedBenchmark {

withTempPath { path =>
prepareDataInfo(benchmark)
val numCols = 1000
val numCols = 500
val schema = writeWideRow(path.getAbsolutePath, rowsNum, numCols)

val cols = (0 until numCols).map { idx =>
Expand Down Expand Up @@ -84,7 +84,7 @@ object SubExprEliminationBenchmark extends SqlBasedBenchmark {

withTempPath { path =>
prepareDataInfo(benchmark)
val numCols = 1000
val numCols = 500
val schema = writeWideRow(path.getAbsolutePath, rowsNum, numCols)

val predicate = (0 until numCols).map { idx =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,8 @@ trait SqlBasedBenchmark extends BenchmarkBase with SQLHelper {

schema
}

override def afterAll(): Unit = {
spark.stop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -519,16 +519,16 @@ object JsonBenchmark extends SqlBasedBenchmark {
override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
val numIters = 3
runBenchmark("Benchmark for performance of JSON parsing") {
schemaInferring(100 * 1000 * 1000, numIters)
countShortColumn(100 * 1000 * 1000, numIters)
countWideColumn(10 * 1000 * 1000, numIters)
countWideRow(500 * 1000, numIters)
selectSubsetOfColumns(10 * 1000 * 1000, numIters)
jsonParserCreation(10 * 1000 * 1000, numIters)
jsonFunctions(10 * 1000 * 1000, numIters)
jsonInDS(50 * 1000 * 1000, numIters)
jsonInFile(50 * 1000 * 1000, numIters)
datetimeBenchmark(rowsNum = 10 * 1000 * 1000, numIters)
schemaInferring(5 * 1000 * 1000, numIters)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MaxGekk I had to reduce the size here. Otherwise GA job dies with complaining no disk space

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this, all benchmarks should pass now .. I will wait for the results before merging it in.

countShortColumn(5 * 1000 * 1000, numIters)
countWideColumn(1000 * 1000, numIters)
countWideRow(50 * 1000, numIters)
selectSubsetOfColumns(1000 * 1000, numIters)
jsonParserCreation(1000 * 1000, numIters)
jsonFunctions(1000 * 1000, numIters)
jsonInDS(5 * 1000 * 1000, numIters)
jsonInFile(5 * 1000 * 1000, numIters)
datetimeBenchmark(rowsNum = 1000 * 1000, numIters)
// Benchmark pushdown filters that refer to top-level columns.
// TODO (SPARK-32325): Add benchmarks for filters with nested column attributes.
filtersPushdownBenchmark(rowsNum = 100 * 1000, numIters)
Expand Down