@@ -21,25 +21,14 @@ import org.apache.spark.sql.SparkSession
2121import org .apache .spark .sql .internal .SQLConf
2222import org .apache .spark .util .Benchmark
2323
24- /**
25- * Benchmark to measure data source write performance.
26- * By default it measures 4 data source format: Parquet, ORC, JSON, CSV:
27- * spark-submit --class <this class> <spark sql test jar>
28- * To measure specified formats, run it with arguments:
29- * spark-submit --class <this class> <spark sql test jar> format1 [format2] [...]
30- */
31- object DataSourceWriteBenchmark {
24+ trait DataSourceWriteBenchmark {
3225 val conf = new SparkConf ()
3326 .setAppName(" DataSourceWriteBenchmark" )
3427 .setIfMissing(" spark.master" , " local[1]" )
35- .set(" spark.sql.parquet.compression.codec" , " snappy" )
36- .set(" spark.sql.orc.compression.codec" , " snappy" )
28+ .set(SQLConf .WHOLESTAGE_CODEGEN_ENABLED .key, " true" )
3729
3830 val spark = SparkSession .builder.config(conf).getOrCreate()
3931
40- // Set default configs. Individual cases will change them if necessary.
41- spark.conf.set(SQLConf .WHOLESTAGE_CODEGEN_ENABLED .key, " true" )
42-
4332 val tempTable = " temp"
4433 val numRows = 1024 * 1024 * 15
4534
@@ -86,64 +75,24 @@ object DataSourceWriteBenchmark {
8675 }
8776 }
8877
89- def main ( args : Array [ String ] ): Unit = {
78+ def runBenchmark ( format : String ): Unit = {
9079 val tableInt = " tableInt"
9180 val tableDouble = " tableDouble"
9281 val tableIntString = " tableIntString"
9382 val tablePartition = " tablePartition"
9483 val tableBucket = " tableBucket"
95- val formats : Seq [String ] = if (args.isEmpty) {
96- Seq (" Parquet" , " ORC" , " JSON" , " CSV" )
97- } else {
98- args
99- }
100- /*
101- Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
102- Parquet writer benchmark: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
103- ------------------------------------------------------------------------------------------------
104- Output Single Int Column 1815 / 1932 8.7 115.4 1.0X
105- Output Single Double Column 1877 / 1878 8.4 119.3 1.0X
106- Output Int and String Column 6265 / 6543 2.5 398.3 0.3X
107- Output Partitions 4067 / 4457 3.9 258.6 0.4X
108- Output Buckets 5608 / 5820 2.8 356.6 0.3X
109-
110- ORC writer benchmark: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
111- ------------------------------------------------------------------------------------------------
112- Output Single Int Column 1201 / 1239 13.1 76.3 1.0X
113- Output Single Double Column 1542 / 1600 10.2 98.0 0.8X
114- Output Int and String Column 6495 / 6580 2.4 412.9 0.2X
115- Output Partitions 3648 / 3842 4.3 231.9 0.3X
116- Output Buckets 5022 / 5145 3.1 319.3 0.2X
117-
118- JSON writer benchmark: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
119- ------------------------------------------------------------------------------------------------
120- Output Single Int Column 1988 / 2093 7.9 126.4 1.0X
121- Output Single Double Column 2854 / 2911 5.5 181.4 0.7X
122- Output Int and String Column 6467 / 6653 2.4 411.1 0.3X
123- Output Partitions 4548 / 5055 3.5 289.1 0.4X
124- Output Buckets 5664 / 5765 2.8 360.1 0.4X
125-
126- CSV writer benchmark: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
127- ------------------------------------------------------------------------------------------------
128- Output Single Int Column 3025 / 3190 5.2 192.3 1.0X
129- Output Single Double Column 3575 / 3634 4.4 227.3 0.8X
130- Output Int and String Column 7313 / 7399 2.2 464.9 0.4X
131- Output Partitions 5105 / 5190 3.1 324.6 0.6X
132- Output Buckets 6986 / 6992 2.3 444.1 0.4X
133- */
13484 withTempTable(tempTable) {
13585 spark.range(numRows).createOrReplaceTempView(tempTable)
136- formats.foreach { format =>
137- withTable(tableInt, tableDouble, tableIntString, tablePartition, tableBucket) {
138- val benchmark = new Benchmark (s " $format writer benchmark " , numRows)
139- writeNumeric(tableInt, format, benchmark, " Int" )
140- writeNumeric(tableDouble, format, benchmark, " Double" )
141- writeIntString(tableIntString, format, benchmark)
142- writePartition(tablePartition, format, benchmark)
143- writeBucket(tableBucket, format, benchmark)
144- benchmark.run()
145- }
86+ withTable(tableInt, tableDouble, tableIntString, tablePartition, tableBucket) {
87+ val benchmark = new Benchmark (s " $format writer benchmark " , numRows)
88+ writeNumeric(tableInt, format, benchmark, " Int" )
89+ writeNumeric(tableDouble, format, benchmark, " Double" )
90+ writeIntString(tableIntString, format, benchmark)
91+ writePartition(tablePartition, format, benchmark)
92+ writeBucket(tableBucket, format, benchmark)
93+ benchmark.run()
14694 }
14795 }
14896 }
14997}
98+
0 commit comments