Skip to content

Commit 232efcc

Browse files
committed
Added column projection
1 parent 5f02f63 commit 232efcc

File tree

1 file changed

+192
-0
lines changed

1 file changed

+192
-0
lines changed
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.orc.bench;
20+
21+
import com.google.gson.JsonStreamParser;
22+
import org.apache.avro.file.DataFileReader;
23+
import org.apache.avro.generic.GenericDatumReader;
24+
import org.apache.avro.generic.GenericRecord;
25+
import org.apache.avro.io.DatumReader;
26+
import org.apache.avro.mapred.FsInput;
27+
import org.apache.hadoop.conf.Configuration;
28+
import org.apache.hadoop.fs.FileSystem;
29+
import org.apache.hadoop.fs.Path;
30+
import org.apache.hadoop.fs.TrackingLocalFileSystem;
31+
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
32+
import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
33+
import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper;
34+
import org.apache.hadoop.io.ArrayWritable;
35+
import org.apache.hadoop.io.NullWritable;
36+
import org.apache.hadoop.mapred.FileSplit;
37+
import org.apache.hadoop.mapred.JobConf;
38+
import org.apache.hadoop.mapred.Reporter;
39+
import org.apache.orc.OrcFile;
40+
import org.apache.orc.Reader;
41+
import org.apache.orc.RecordReader;
42+
import org.apache.orc.TypeDescription;
43+
import org.apache.parquet.hadoop.ParquetInputFormat;
44+
import org.iq80.snappy.SnappyInputStream;
45+
import org.openjdk.jmh.annotations.AuxCounters;
46+
import org.openjdk.jmh.annotations.Benchmark;
47+
import org.openjdk.jmh.annotations.BenchmarkMode;
48+
import org.openjdk.jmh.annotations.Fork;
49+
import org.openjdk.jmh.annotations.Level;
50+
import org.openjdk.jmh.annotations.Measurement;
51+
import org.openjdk.jmh.annotations.Mode;
52+
import org.openjdk.jmh.annotations.OutputTimeUnit;
53+
import org.openjdk.jmh.annotations.Param;
54+
import org.openjdk.jmh.annotations.Scope;
55+
import org.openjdk.jmh.annotations.Setup;
56+
import org.openjdk.jmh.annotations.State;
57+
import org.openjdk.jmh.annotations.TearDown;
58+
import org.openjdk.jmh.annotations.Warmup;
59+
import org.openjdk.jmh.runner.Runner;
60+
import org.openjdk.jmh.runner.options.OptionsBuilder;
61+
62+
import java.io.InputStream;
63+
import java.io.InputStreamReader;
64+
import java.net.URI;
65+
import java.util.List;
66+
import java.util.concurrent.TimeUnit;
67+
import java.util.zip.GZIPInputStream;
68+
69+
@BenchmarkMode(Mode.AverageTime)
70+
@Warmup(iterations=1, time=10, timeUnit = TimeUnit.SECONDS)
71+
@Measurement(iterations=3, time=10, timeUnit = TimeUnit.SECONDS)
72+
@State(Scope.Thread)
73+
@OutputTimeUnit(TimeUnit.MICROSECONDS)
74+
@Fork(1)
75+
public class ColumnProjectionBenchmark {
76+
77+
@Param({ "github", "sales", "taxi"})
78+
public String Dataset;
79+
80+
// @Param({"none", "snappy", "zlib"})
81+
@Param({"zlib"})
82+
public String compression;
83+
84+
@AuxCounters
85+
@State(Scope.Thread)
86+
public static class ExtraCounters {
87+
long bytesRead;
88+
long reads;
89+
long records;
90+
long invocations;
91+
92+
@Setup(Level.Iteration)
93+
public void clean() {
94+
bytesRead = 0;
95+
reads = 0;
96+
records = 0;
97+
invocations = 0;
98+
}
99+
100+
@TearDown(Level.Iteration)
101+
public void print() {
102+
System.out.println();
103+
System.out.println("Reads: " + reads);
104+
System.out.println("Bytes: " + bytesRead);
105+
System.out.println("Records: " + records);
106+
System.out.println("Invocations: " + invocations);
107+
}
108+
109+
public long kilobytes() {
110+
return bytesRead / 1024;
111+
}
112+
113+
public long records() {
114+
return records;
115+
}
116+
}
117+
118+
@Benchmark
119+
public void orc(ExtraCounters counters) throws Exception{
120+
Configuration conf = new Configuration();
121+
TrackingLocalFileSystem fs = new TrackingLocalFileSystem();
122+
fs.initialize(new URI("file:///"), conf);
123+
FileSystem.Statistics statistics = fs.getLocalStatistics();
124+
statistics.reset();
125+
OrcFile.ReaderOptions options = OrcFile.readerOptions(conf).filesystem(fs);
126+
Path path = new Path("generated/" + Dataset + "-" + compression + ".orc");
127+
Reader reader = OrcFile.createReader(path, options);
128+
TypeDescription schema = reader.getSchema();
129+
boolean[] include = new boolean[schema.getMaximumId() + 1];
130+
// select first two columns
131+
List<TypeDescription> children = schema.getChildren();
132+
for(int c= children.get(0).getId(); c <= children.get(1).getMaximumId(); ++c) {
133+
include[c] = true;
134+
}
135+
RecordReader rows = reader.rows(new Reader.Options()
136+
.include(include));
137+
VectorizedRowBatch batch = schema.createRowBatch();
138+
while (rows.nextBatch(batch)) {
139+
counters.records += batch.size;
140+
}
141+
rows.close();
142+
counters.bytesRead += statistics.getBytesRead();
143+
counters.reads += statistics.getReadOps();
144+
counters.invocations += 1;
145+
}
146+
147+
@Benchmark
148+
public void parquet(ExtraCounters counters) throws Exception {
149+
JobConf conf = new JobConf();
150+
conf.set("fs.track.impl", TrackingLocalFileSystem.class.getName());
151+
conf.set("fs.defaultFS", "track:///");
152+
if ("taxi".equals(Dataset)) {
153+
conf.set("columns", "vendor_id,pickup_time");
154+
conf.set("columns.types", "int,timestamp");
155+
} else if ("sales".equals(Dataset)) {
156+
conf.set("columns", "sales_id,customer_id");
157+
conf.set("columns.types", "bigint,bigint");
158+
} else if ("github".equals(Dataset)) {
159+
conf.set("columns", "actor,created_at");
160+
conf.set("columns.types", "struct<avatar_url:string,gravatar_id:string," +
161+
"id:int,login:string,url:string>,timestamp");
162+
} else {
163+
throw new IllegalArgumentException("Unknown data set " + Dataset);
164+
}
165+
Path path = new Path("generated/" + Dataset + "-" + compression + ".parquet");
166+
FileSystem.Statistics statistics = FileSystem.getStatistics("track:///",
167+
TrackingLocalFileSystem.class);
168+
statistics.reset();
169+
ParquetInputFormat<ArrayWritable> inputFormat =
170+
new ParquetInputFormat<>(DataWritableReadSupport.class);
171+
172+
NullWritable nada = NullWritable.get();
173+
FileSplit split = new FileSplit(path, 0, Long.MAX_VALUE, new String[]{});
174+
org.apache.hadoop.mapred.RecordReader<NullWritable,ArrayWritable> recordReader =
175+
new ParquetRecordReaderWrapper(inputFormat, split, conf,
176+
Reporter.NULL);
177+
ArrayWritable value = recordReader.createValue();
178+
while (recordReader.next(nada, value)) {
179+
counters.records += 1;
180+
}
181+
recordReader.close();
182+
counters.bytesRead += statistics.getBytesRead();
183+
counters.reads += statistics.getReadOps();
184+
counters.invocations += 1;
185+
}
186+
public static void main(String[] args) throws Exception {
187+
new Runner(new OptionsBuilder()
188+
.include(ColumnProjectionBenchmark.class.getSimpleName())
189+
.jvmArgs("-server", "-Xms256m", "-Xmx2g").build()
190+
).run();
191+
}
192+
}

0 commit comments

Comments
 (0)