Skip to content

Commit 5259daf

Browse files
Spark: Adds perf benchmarks for ZOrdering vs Sort Rewrite
1 parent d435242 commit 5259daf

File tree

1 file changed

+272
-0
lines changed

1 file changed

+272
-0
lines changed
Lines changed: 272 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
21+
package org.apache.iceberg.spark.action;
22+
23+
import java.io.IOException;
24+
import java.util.Collections;
25+
import java.util.UUID;
26+
import java.util.concurrent.TimeUnit;
27+
import org.apache.hadoop.conf.Configuration;
28+
import org.apache.iceberg.NullOrder;
29+
import org.apache.iceberg.Schema;
30+
import org.apache.iceberg.SortDirection;
31+
import org.apache.iceberg.SortOrder;
32+
import org.apache.iceberg.Table;
33+
import org.apache.iceberg.relocated.com.google.common.io.Files;
34+
import org.apache.iceberg.spark.Spark3Util;
35+
import org.apache.iceberg.spark.SparkSchemaUtil;
36+
import org.apache.iceberg.spark.SparkSessionCatalog;
37+
import org.apache.iceberg.spark.actions.SparkActions;
38+
import org.apache.iceberg.types.Types;
39+
import org.apache.spark.sql.Dataset;
40+
import org.apache.spark.sql.Row;
41+
import org.apache.spark.sql.SaveMode;
42+
import org.apache.spark.sql.SparkSession;
43+
import org.apache.spark.sql.connector.catalog.Identifier;
44+
import org.apache.spark.sql.connector.expressions.Transform;
45+
import org.openjdk.jmh.annotations.Benchmark;
46+
import org.openjdk.jmh.annotations.BenchmarkMode;
47+
import org.openjdk.jmh.annotations.Fork;
48+
import org.openjdk.jmh.annotations.Level;
49+
import org.openjdk.jmh.annotations.Measurement;
50+
import org.openjdk.jmh.annotations.Mode;
51+
import org.openjdk.jmh.annotations.Scope;
52+
import org.openjdk.jmh.annotations.Setup;
53+
import org.openjdk.jmh.annotations.State;
54+
import org.openjdk.jmh.annotations.TearDown;
55+
import org.openjdk.jmh.annotations.Threads;
56+
import org.openjdk.jmh.annotations.Timeout;
57+
58+
import static org.apache.iceberg.types.Types.NestedField.optional;
59+
import static org.apache.iceberg.types.Types.NestedField.required;
60+
import static org.apache.spark.sql.functions.col;
61+
import static org.apache.spark.sql.functions.current_date;
62+
import static org.apache.spark.sql.functions.date_add;
63+
import static org.apache.spark.sql.functions.expr;
64+
65+
@Fork(1)
66+
@State(Scope.Benchmark)
67+
@Measurement(iterations = 3)
68+
@BenchmarkMode(Mode.SingleShotTime)
69+
@Timeout(time = 1000, timeUnit = TimeUnit.HOURS)
70+
public class IcebergSortCompactionBenchmark {
71+
72+
private static final String[] NAMESPACE = new String[] {"default"};
73+
private static final String NAME = "sortbench";
74+
private static final Identifier IDENT = Identifier.of(NAMESPACE, NAME);
75+
private static final int NUM_FILES = 8;
76+
private static final long NUM_ROWS = 10000000L;
77+
78+
79+
private final Configuration hadoopConf = initHadoopConf();
80+
private SparkSession spark;
81+
82+
@Setup
83+
public void setupBench() {
84+
setupSpark();
85+
}
86+
87+
@TearDown
88+
public void teardownBench() {
89+
tearDownSpark();
90+
}
91+
92+
@Setup(Level.Iteration)
93+
public void setupIteration() {
94+
initTable();
95+
appendData();
96+
}
97+
98+
@TearDown(Level.Iteration)
99+
public void cleanUpIteration() throws IOException {
100+
cleanupFiles();
101+
}
102+
103+
@Benchmark
104+
@Threads(1)
105+
public void sortInt() {
106+
SparkActions.get()
107+
.rewriteDataFiles(table())
108+
.sort(SortOrder
109+
.builderFor(table().schema())
110+
.sortBy("intCol", SortDirection.ASC, NullOrder.NULLS_FIRST)
111+
.build())
112+
.execute();
113+
}
114+
115+
@Benchmark
116+
@Threads(1)
117+
public void sortString() {
118+
SparkActions.get()
119+
.rewriteDataFiles(table())
120+
.sort(SortOrder
121+
.builderFor(table().schema())
122+
.sortBy("stringCol", SortDirection.ASC, NullOrder.NULLS_FIRST)
123+
.build())
124+
.execute();
125+
}
126+
127+
@Benchmark
128+
@Threads(1)
129+
public void sortFourColumns() {
130+
SparkActions.get()
131+
.rewriteDataFiles(table())
132+
.sort(SortOrder
133+
.builderFor(table().schema())
134+
.sortBy("stringCol", SortDirection.ASC, NullOrder.NULLS_FIRST)
135+
.sortBy("intCol", SortDirection.ASC, NullOrder.NULLS_FIRST)
136+
.sortBy("dateCol", SortDirection.DESC, NullOrder.NULLS_FIRST)
137+
.sortBy("doubleCol", SortDirection.DESC, NullOrder.NULLS_FIRST)
138+
.build())
139+
.execute();
140+
}
141+
142+
@Benchmark
143+
@Threads(1)
144+
public void sortSixColumns() {
145+
SparkActions.get()
146+
.rewriteDataFiles(table())
147+
.sort(SortOrder
148+
.builderFor(table().schema())
149+
.sortBy("stringCol", SortDirection.ASC, NullOrder.NULLS_FIRST)
150+
.sortBy("intCol", SortDirection.ASC, NullOrder.NULLS_FIRST)
151+
.sortBy("dateCol", SortDirection.DESC, NullOrder.NULLS_FIRST)
152+
.sortBy("timestampCol", SortDirection.DESC, NullOrder.NULLS_FIRST)
153+
.sortBy("doubleCol", SortDirection.DESC, NullOrder.NULLS_FIRST)
154+
.sortBy("longCol", SortDirection.DESC, NullOrder.NULLS_FIRST)
155+
.build())
156+
.execute();
157+
}
158+
159+
@Benchmark
160+
@Threads(1)
161+
public void zSortInt() {
162+
SparkActions.get()
163+
.rewriteDataFiles(table())
164+
.zOrder("intCol")
165+
.execute();
166+
}
167+
168+
@Benchmark
169+
@Threads(1)
170+
public void zSortString() {
171+
SparkActions.get()
172+
.rewriteDataFiles(table())
173+
.zOrder("stringCol")
174+
.execute();
175+
}
176+
177+
@Benchmark
178+
@Threads(1)
179+
public void zSortFourColumns() {
180+
SparkActions.get()
181+
.rewriteDataFiles(table())
182+
.zOrder("stringCol", "intCol", "dateCol", "doubleCol")
183+
.execute();
184+
}
185+
186+
@Benchmark
187+
@Threads(1)
188+
public void zSortSixColumns() {
189+
SparkActions.get()
190+
.rewriteDataFiles(table())
191+
.zOrder("stringCol", "intCol", "dateCol", "timestampCol", "doubleCol", "longCol")
192+
.execute();
193+
}
194+
195+
protected Configuration initHadoopConf() {
196+
return new Configuration();
197+
}
198+
199+
protected final void initTable() {
200+
Schema schema = new Schema(
201+
required(1, "longCol", Types.LongType.get()),
202+
required(2, "intCol", Types.IntegerType.get()),
203+
required(3, "floatCol", Types.FloatType.get()),
204+
optional(4, "doubleCol", Types.DoubleType.get()),
205+
optional(6, "dateCol", Types.DateType.get()),
206+
optional(7, "timestampCol", Types.TimestampType.withZone()),
207+
optional(8, "stringCol", Types.StringType.get()));
208+
209+
SparkSessionCatalog catalog = null;
210+
try {
211+
catalog = (SparkSessionCatalog)
212+
Spark3Util.catalogAndIdentifier(spark(), "spark_catalog").catalog();
213+
catalog.dropTable(IDENT);
214+
catalog.createTable(IDENT, SparkSchemaUtil.convert(schema), new Transform[0], Collections.emptyMap());
215+
} catch (Exception e) {
216+
throw new RuntimeException(e);
217+
}
218+
}
219+
220+
private void appendData() {
221+
Dataset<Row> df = spark().range(0, NUM_ROWS * NUM_FILES, 1, NUM_FILES)
222+
.withColumnRenamed("id", "longCol")
223+
.withColumn("intCol", expr("CAST(longCol AS INT)"))
224+
.withColumn("floatCol", expr("CAST(longCol AS FLOAT)"))
225+
.withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)"))
226+
.withColumn("dateCol", date_add(current_date(), col("intCol").mod(NUM_FILES)))
227+
.withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)"))
228+
.withColumn("stringCol", expr("CAST(dateCol AS STRING)"));
229+
writeData(df);
230+
}
231+
232+
private void writeData(Dataset<Row> df) {
233+
df.write().format("iceberg").mode(SaveMode.Append).save(NAME);
234+
}
235+
236+
protected final Table table() {
237+
try {
238+
return Spark3Util.loadIcebergTable(spark(), NAME);
239+
} catch (Exception e) {
240+
throw new RuntimeException(e);
241+
}
242+
}
243+
244+
protected final SparkSession spark() {
245+
return spark;
246+
}
247+
248+
protected String getCatalogWarehouse() {
249+
String location = Files.createTempDir().getAbsolutePath() + "/" + UUID.randomUUID() + "/";
250+
return location;
251+
}
252+
253+
protected void cleanupFiles() throws IOException {
254+
spark.sql("DROP TABLE IF EXISTS " + NAME);
255+
}
256+
257+
protected void setupSpark() {
258+
SparkSession.Builder builder =
259+
SparkSession.builder()
260+
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
261+
.config("spark.sql.catalog.spark_catalog.type", "hadoop")
262+
.config("spark.sql.catalog.spark_catalog.warehouse", getCatalogWarehouse())
263+
.master("local[*]");
264+
spark = builder.getOrCreate();
265+
Configuration sparkHadoopConf = spark.sessionState().newHadoopConf();
266+
hadoopConf.forEach(entry -> sparkHadoopConf.set(entry.getKey(), entry.getValue()));
267+
}
268+
269+
protected void tearDownSpark() {
270+
spark.stop();
271+
}
272+
}

0 commit comments

Comments
 (0)