diff --git a/.travis.yml b/.travis.yml index 3149d2f2f4..74d0f541a9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,6 +2,8 @@ language: cpp compiler: - gcc - clang +jdk: + - openjdk7 before_install: - sudo add-apt-repository -y ppa:ubuntu-toolchain-r/test - sudo apt-get -qq update @@ -12,5 +14,7 @@ before_script: - mkdir build - cd build - cmake .. +env: + - MAVEN_OPTS=-Xmx2g MAVEN_SKIP_RC=true script: - make package test-out diff --git a/java/bench/.gitignore b/java/bench/.gitignore new file mode 100644 index 0000000000..babcae653e --- /dev/null +++ b/java/bench/.gitignore @@ -0,0 +1,5 @@ +.*.crc +*.json.gz +*.avro +*.parquet +*.orc diff --git a/java/bench/fetch-data.sh b/java/bench/fetch-data.sh new file mode 100755 index 0000000000..90e0d0d7e7 --- /dev/null +++ b/java/bench/fetch-data.sh @@ -0,0 +1,6 @@ +#!/bin/bash +mkdir -p data/sources/taxi +(cd data/sources/taxi; wget https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2015-{11,12}.csv) +(cd data/sources/taxi; gzip *.csv) +mkdir -p data/sources/github +(cd data/sources/github; wget http://data.githubarchive.org/2015-11-{01..15}-{0..23}.json.gz) diff --git a/java/bench/pom.xml b/java/bench/pom.xml new file mode 100644 index 0000000000..66bbf7b1ad --- /dev/null +++ b/java/bench/pom.xml @@ -0,0 +1,159 @@ + + + + 4.0.0 + + org.apache.orc + orc + 1.4.0-SNAPSHOT + ../pom.xml + + + orc-benchmarks + jar + ORC Benchmarks + + Benchmarks for comparing ORC, Parquet, and Avro performance. + + + + + org.apache.orc + orc-core + + + + + com.fasterxml.jackson.core + jackson-core + + + com.google.code.gson + gson + + + commons-cli + commons-cli + + + io.airlift + aircompressor + + + org.apache.avro + avro + + + org.apache.avro + avro-mapred + hadoop2 + + + org.apache.commons + commons-csv + + + org.apache.hadoop + hadoop-common + + + org.apache.hadoop + hadoop-hdfs + runtime + + + org.apache.hadoop + hadoop-mapreduce-client-core + + + org.apache.hive + hive-common + + + org.apache.hive + hive-exec + core + + + org.apache.hive + hive-serde + + + org.apache.hive + hive-storage-api + + + org.apache.parquet + parquet-hadoop-bundle + + + org.jodd + jodd-core + + + org.openjdk.jmh + jmh-core + + + + + ${basedir}/src/java + ${basedir}/src/test + + + ${basedir}/src/test/resources + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + maven-assembly-plugin + + + + org.apache.orc.bench.Driver + + + + src/assembly/uber.xml + + + + + make-assembly + package + + single + + + + + + + + + + cmake + + ${build.dir}/bench + + + + diff --git a/java/bench/src/assembly/uber.xml b/java/bench/src/assembly/uber.xml new file mode 100644 index 0000000000..014eab951b --- /dev/null +++ b/java/bench/src/assembly/uber.xml @@ -0,0 +1,33 @@ + + + uber + + jar + + false + + + / + true + true + runtime + + + + + metaInf-services + + + diff --git a/java/bench/src/findbugs/exclude.xml b/java/bench/src/findbugs/exclude.xml new file mode 100644 index 0000000000..dde147124e --- /dev/null +++ b/java/bench/src/findbugs/exclude.xml @@ -0,0 +1,25 @@ + + + + + + + + + + + + + diff --git a/java/bench/src/java/org/apache/hadoop/fs/TrackingLocalFileSystem.java b/java/bench/src/java/org/apache/hadoop/fs/TrackingLocalFileSystem.java new file mode 100644 index 0000000000..0440495033 --- /dev/null +++ b/java/bench/src/java/org/apache/hadoop/fs/TrackingLocalFileSystem.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.io.FileNotFoundException; +import java.io.IOException; + +public class TrackingLocalFileSystem extends RawLocalFileSystem { + + class TrackingFileInputStream extends RawLocalFileSystem.LocalFSFileInputStream { + public TrackingFileInputStream(Path f) throws IOException { + super(f); + } + + public int read() throws IOException { + statistics.incrementReadOps(1); + return super.read(); + } + + public int read(byte[] b, int off, int len) throws IOException { + statistics.incrementReadOps(1); + return super.read(b, off, len); + } + + public int read(long position, byte[] b, int off, int len) throws IOException { + statistics.incrementReadOps(1); + return super.read(position, b, off, len); + } + } + + public FSDataInputStream open(Path f, int bufferSize) throws IOException { + if (!exists(f)) { + throw new FileNotFoundException(f.toString()); + } + return new FSDataInputStream(new BufferedFSInputStream( + new TrackingFileInputStream(f), bufferSize)); + } + + public FileSystem.Statistics getLocalStatistics() { + return statistics; + } +} diff --git a/java/bench/src/java/org/apache/hadoop/hive/ql/io/orc/OrcBenchmarkUtilities.java b/java/bench/src/java/org/apache/hadoop/hive/ql/io/orc/OrcBenchmarkUtilities.java new file mode 100644 index 0000000000..b29b0b2f3b --- /dev/null +++ b/java/bench/src/java/org/apache/hadoop/hive/ql/io/orc/OrcBenchmarkUtilities.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc; + +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.Writable; +import org.apache.orc.OrcProto; +import org.apache.orc.OrcUtils; +import org.apache.orc.TypeDescription; + +import java.util.List; + +/** + * Utilities that need the non-public methods from Hive. + */ +public class OrcBenchmarkUtilities { + + public static StructObjectInspector createObjectInspector(TypeDescription schema) { + List types = OrcUtils.getOrcTypes(schema); + return (StructObjectInspector) OrcStruct.createObjectInspector(0, types); + } + + public static Writable nextObject(VectorizedRowBatch batch, + TypeDescription schema, + int rowId, + Writable obj) { + OrcStruct result = (OrcStruct) obj; + if (result == null) { + result = new OrcStruct(batch.cols.length); + } + List childrenTypes = schema.getChildren(); + for(int c=0; c < batch.cols.length; ++c) { + result.setFieldValue(c, RecordReaderImpl.nextValue(batch.cols[c], rowId, + childrenTypes.get(c), result.getFieldValue(c))); + } + return result; + } +} diff --git a/java/bench/src/java/org/apache/orc/bench/ColumnProjectionBenchmark.java b/java/bench/src/java/org/apache/orc/bench/ColumnProjectionBenchmark.java new file mode 100644 index 0000000000..4afaaf1d07 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/ColumnProjectionBenchmark.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.TrackingLocalFileSystem; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; +import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.RecordReader; +import org.apache.orc.TypeDescription; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.openjdk.jmh.annotations.AuxCounters; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +import java.net.URI; +import java.util.List; +import java.util.concurrent.TimeUnit; + +@BenchmarkMode(Mode.AverageTime) +@Warmup(iterations=1, time=10, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations=3, time=10, timeUnit = TimeUnit.SECONDS) +@State(Scope.Thread) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@Fork(1) +public class ColumnProjectionBenchmark { + + private static final String ROOT_ENVIRONMENT_NAME = "bench.root.dir"; + private static final Path root; + static { + String value = System.getProperty(ROOT_ENVIRONMENT_NAME); + root = value == null ? null : new Path(value); + } + + @Param({ "github", "sales", "taxi"}) + public String dataset; + + @Param({"none", "snappy", "zlib"}) + public String compression; + + @AuxCounters + @State(Scope.Thread) + public static class ExtraCounters { + long bytesRead; + long reads; + long records; + long invocations; + + @Setup(Level.Iteration) + public void clean() { + bytesRead = 0; + reads = 0; + records = 0; + invocations = 0; + } + + @TearDown(Level.Iteration) + public void print() { + System.out.println(); + System.out.println("Reads: " + reads); + System.out.println("Bytes: " + bytesRead); + System.out.println("Records: " + records); + System.out.println("Invocations: " + invocations); + } + + public long kilobytes() { + return bytesRead / 1024; + } + + public long records() { + return records; + } + } + + @Benchmark + public void orc(ExtraCounters counters) throws Exception{ + Configuration conf = new Configuration(); + TrackingLocalFileSystem fs = new TrackingLocalFileSystem(); + fs.initialize(new URI("file:///"), conf); + FileSystem.Statistics statistics = fs.getLocalStatistics(); + statistics.reset(); + OrcFile.ReaderOptions options = OrcFile.readerOptions(conf).filesystem(fs); + Path path = Utilities.getVariant(root, dataset, "orc", compression); + Reader reader = OrcFile.createReader(path, options); + TypeDescription schema = reader.getSchema(); + boolean[] include = new boolean[schema.getMaximumId() + 1]; + // select first two columns + List children = schema.getChildren(); + for(int c= children.get(0).getId(); c <= children.get(1).getMaximumId(); ++c) { + include[c] = true; + } + RecordReader rows = reader.rows(new Reader.Options() + .include(include)); + VectorizedRowBatch batch = schema.createRowBatch(); + while (rows.nextBatch(batch)) { + counters.records += batch.size; + } + rows.close(); + counters.bytesRead += statistics.getBytesRead(); + counters.reads += statistics.getReadOps(); + counters.invocations += 1; + } + + @Benchmark + public void parquet(ExtraCounters counters) throws Exception { + JobConf conf = new JobConf(); + conf.set("fs.track.impl", TrackingLocalFileSystem.class.getName()); + conf.set("fs.defaultFS", "track:///"); + if ("taxi".equals(dataset)) { + conf.set("columns", "vendor_id,pickup_time"); + conf.set("columns.types", "int,timestamp"); + } else if ("sales".equals(dataset)) { + conf.set("columns", "sales_id,customer_id"); + conf.set("columns.types", "bigint,bigint"); + } else if ("github".equals(dataset)) { + conf.set("columns", "actor,created_at"); + conf.set("columns.types", "struct,timestamp"); + } else { + throw new IllegalArgumentException("Unknown data set " + dataset); + } + Path path = Utilities.getVariant(root, dataset, "parquet", compression); + FileSystem.Statistics statistics = FileSystem.getStatistics("track:///", + TrackingLocalFileSystem.class); + statistics.reset(); + ParquetInputFormat inputFormat = + new ParquetInputFormat<>(DataWritableReadSupport.class); + + NullWritable nada = NullWritable.get(); + FileSplit split = new FileSplit(path, 0, Long.MAX_VALUE, new String[]{}); + org.apache.hadoop.mapred.RecordReader recordReader = + new ParquetRecordReaderWrapper(inputFormat, split, conf, Reporter.NULL); + ArrayWritable value = recordReader.createValue(); + while (recordReader.next(nada, value)) { + counters.records += 1; + } + recordReader.close(); + counters.bytesRead += statistics.getBytesRead(); + counters.reads += statistics.getReadOps(); + counters.invocations += 1; + } + public static void main(String[] args) throws Exception { + new Runner(new OptionsBuilder() + .include(ColumnProjectionBenchmark.class.getSimpleName()) + .jvmArgs("-server", "-Xms256m", "-Xmx2g", + "-D" + ROOT_ENVIRONMENT_NAME + "=" + args[0]).build() + ).run(); + } +} diff --git a/java/bench/src/java/org/apache/orc/bench/CompressionKind.java b/java/bench/src/java/org/apache/orc/bench/CompressionKind.java new file mode 100644 index 0000000000..9274de3ea4 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/CompressionKind.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench; + +import io.airlift.compress.snappy.SnappyCodec; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +/** + * Enum for handling the compression codecs for the benchmark + */ +public enum CompressionKind { + NONE(".none"), + ZLIB(".gz"), + SNAPPY(".snappy"); + + CompressionKind(String extendsion) { + this.extension = extendsion; + } + + private final String extension; + + public String getExtension() { + return extension; + } + + public OutputStream create(OutputStream out) throws IOException { + switch (this) { + case NONE: + return out; + case ZLIB: + return new GZIPOutputStream(out); + case SNAPPY: + return new SnappyCodec().createOutputStream(out); + default: + throw new IllegalArgumentException("Unhandled kind " + this); + } + } + + public InputStream read(InputStream in) throws IOException { + switch (this) { + case NONE: + return in; + case ZLIB: + return new GZIPInputStream(in); + case SNAPPY: + return new SnappyCodec().createInputStream(in); + default: + throw new IllegalArgumentException("Unhandled kind " + this); + } + } + + public static CompressionKind fromPath(Path path) { + String name = path.getName(); + int lastDot = name.lastIndexOf('.'); + if (lastDot >= 0) { + String ext = name.substring(lastDot); + for (CompressionKind value : values()) { + if (ext.equals(value.getExtension())) { + return value; + } + } + } + return NONE; + } +} diff --git a/java/bench/src/java/org/apache/orc/bench/Driver.java b/java/bench/src/java/org/apache/orc/bench/Driver.java new file mode 100644 index 0000000000..c8f159272e --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/Driver.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.hadoop.conf.Configuration; +import org.apache.orc.bench.convert.GenerateVariants; +import org.apache.orc.bench.convert.ScanVariants; + +import java.util.Arrays; + +/** + * A driver tool to call the various benchmark classes. + */ +public class Driver { + + static CommandLine parseCommandLine(String[] args) throws ParseException { + Options options = new Options() + .addOption("h", "help", false, "Provide help") + .addOption("D", "define", true, "Change configuration settings"); + CommandLine result = new DefaultParser().parse(options, args, true); + if (result.hasOption("help") || result.getArgs().length == 0) { + new HelpFormatter().printHelp("benchmark ", options); + System.err.println(); + System.err.println("Commands:"); + System.err.println(" generate - Generate data variants"); + System.err.println(" scan - Scan data variants"); + System.err.println(" read-all - Full table scan benchmark"); + System.err.println(" read-some - Column projection benchmark"); + System.exit(1); + } + return result; + } + + public static void main(String[] args) throws Exception { + CommandLine cli = parseCommandLine(args); + args = cli.getArgs(); + String command = args[0]; + args = Arrays.copyOfRange(args, 1, args.length); + switch (command) { + case "generate": + GenerateVariants.main(args); + break; + case "scan": + ScanVariants.main(args); + break; + case "read-all": + FullReadBenchmark.main(args); + break; + case "read-some": + ColumnProjectionBenchmark.main(args); + break; + default: + System.err.println("Unknown command " + command); + System.exit(1); + } + } +} diff --git a/java/bench/src/java/org/apache/orc/bench/FullReadBenchmark.java b/java/bench/src/java/org/apache/orc/bench/FullReadBenchmark.java new file mode 100644 index 0000000000..952f18dd88 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/FullReadBenchmark.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench; + +import com.google.gson.JsonStreamParser; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.mapred.FsInput; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.TrackingLocalFileSystem; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; +import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.RecordReader; +import org.apache.orc.TypeDescription; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.openjdk.jmh.annotations.AuxCounters; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeUnit; + +@BenchmarkMode(Mode.AverageTime) +@Warmup(iterations=1, time=10, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations=3, time=10, timeUnit = TimeUnit.SECONDS) +@State(Scope.Thread) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@Fork(1) +public class FullReadBenchmark { + + private static final String ROOT_ENVIRONMENT_NAME = "bench.root.dir"; + private static final Path root; + static { + String value = System.getProperty(ROOT_ENVIRONMENT_NAME); + root = value == null ? null : new Path(value); + } + + @Param({"taxi", "sales", "github"}) + public String dataset; + + @Param({"none", "zlib", "snappy"}) + public String compression; + + @AuxCounters + @State(Scope.Thread) + public static class ExtraCounters { + long bytesRead; + long reads; + long records; + long invocations; + + @Setup(Level.Iteration) + public void clean() { + bytesRead = 0; + reads = 0; + records = 0; + invocations = 0; + } + + @TearDown(Level.Iteration) + public void print() { + System.out.println(); + System.out.println("Reads: " + reads); + System.out.println("Bytes: " + bytesRead); + System.out.println("Records: " + records); + System.out.println("Invocations: " + invocations); + } + + public long kilobytes() { + return bytesRead / 1024; + } + + public long records() { + return records; + } + } + + @Benchmark + public void orc(ExtraCounters counters) throws Exception{ + Configuration conf = new Configuration(); + TrackingLocalFileSystem fs = new TrackingLocalFileSystem(); + fs.initialize(new URI("file:///"), conf); + FileSystem.Statistics statistics = fs.getLocalStatistics(); + statistics.reset(); + OrcFile.ReaderOptions options = OrcFile.readerOptions(conf).filesystem(fs); + Path path = Utilities.getVariant(root, dataset, "orc", compression); + Reader reader = OrcFile.createReader(path, options); + TypeDescription schema = reader.getSchema(); + RecordReader rows = reader.rows(); + VectorizedRowBatch batch = schema.createRowBatch(); + while (rows.nextBatch(batch)) { + counters.records += batch.size; + } + rows.close(); + counters.bytesRead += statistics.getBytesRead(); + counters.reads += statistics.getReadOps(); + counters.invocations += 1; + } + + @Benchmark + public void avro(ExtraCounters counters) throws Exception { + Configuration conf = new Configuration(); + conf.set("fs.track.impl", TrackingLocalFileSystem.class.getName()); + conf.set("fs.defaultFS", "track:///"); + Path path = Utilities.getVariant(root, dataset, "avro", compression); + FileSystem.Statistics statistics = FileSystem.getStatistics("track:///", + TrackingLocalFileSystem.class); + statistics.reset(); + FsInput file = new FsInput(path, conf); + DatumReader datumReader = new GenericDatumReader<>(); + DataFileReader dataFileReader = + new DataFileReader<>(file, datumReader); + GenericRecord record = null; + while (dataFileReader.hasNext()) { + record = dataFileReader.next(record); + counters.records += 1; + } + counters.bytesRead += statistics.getBytesRead(); + counters.reads += statistics.getReadOps(); + counters.invocations += 1; + } + + @Benchmark + public void parquet(ExtraCounters counters) throws Exception { + JobConf conf = new JobConf(); + conf.set("fs.track.impl", TrackingLocalFileSystem.class.getName()); + conf.set("fs.defaultFS", "track:///"); + Path path = Utilities.getVariant(root, dataset, "parquet", compression); + FileSystem.Statistics statistics = FileSystem.getStatistics("track:///", + TrackingLocalFileSystem.class); + statistics.reset(); + ParquetInputFormat inputFormat = + new ParquetInputFormat<>(DataWritableReadSupport.class); + + NullWritable nada = NullWritable.get(); + FileSplit split = new FileSplit(path, 0, Long.MAX_VALUE, new String[]{}); + org.apache.hadoop.mapred.RecordReader recordReader = + new ParquetRecordReaderWrapper(inputFormat, split, conf, Reporter.NULL); + ArrayWritable value = recordReader.createValue(); + while (recordReader.next(nada, value)) { + counters.records += 1; + } + recordReader.close(); + counters.bytesRead += statistics.getBytesRead(); + counters.reads += statistics.getReadOps(); + counters.invocations += 1; + } + + @Benchmark + public void json(ExtraCounters counters) throws Exception { + Configuration conf = new Configuration(); + TrackingLocalFileSystem fs = new TrackingLocalFileSystem(); + fs.initialize(new URI("file:///"), conf); + FileSystem.Statistics statistics = fs.getLocalStatistics(); + statistics.reset(); + Path path = Utilities.getVariant(root, dataset, "json", compression); + CompressionKind compress = CompressionKind.valueOf(compression); + InputStream input = compress.read(fs.open(path)); + JsonStreamParser parser = + new JsonStreamParser(new InputStreamReader(input, + StandardCharsets.UTF_8)); + while (parser.hasNext()) { + parser.next(); + counters.records += 1; + } + counters.bytesRead += statistics.getBytesRead(); + counters.reads += statistics.getReadOps(); + counters.invocations += 1; + } + + public static void main(String[] args) throws Exception { + new Runner(new OptionsBuilder() + .include(FullReadBenchmark.class.getSimpleName()) + .addProfiler("hs_gc") + .jvmArgs("-server", "-Xms256m", "-Xmx2g", + "-D" + ROOT_ENVIRONMENT_NAME + "=" + args[0]).build() + ).run(); + } +} diff --git a/java/bench/src/java/org/apache/orc/bench/RandomGenerator.java b/java/bench/src/java/org/apache/orc/bench/RandomGenerator.java new file mode 100644 index 0000000000..dfe7d43b25 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/RandomGenerator.java @@ -0,0 +1,524 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench; + +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.TypeDescription; + +import java.nio.charset.StandardCharsets; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +public class RandomGenerator { + private final TypeDescription schema = TypeDescription.createStruct(); + private final List fields = new ArrayList<>(); + private final Random random; + + public RandomGenerator(int seed) { + random = new Random(seed); + } + + private abstract class ValueGenerator { + double nullProbability = 0; + abstract void generate(ColumnVector vector, int valueCount); + } + + private class RandomBoolean extends ValueGenerator { + public void generate(ColumnVector v, int valueCount) { + LongColumnVector vector = (LongColumnVector) v; + for(int r=0; r < valueCount; ++r) { + if (nullProbability != 0 && random.nextDouble() < nullProbability) { + v.noNulls = false; + v.isNull[r] = true; + } else { + vector.vector[r] = random.nextInt(2); + } + } + } + } + + private class RandomList extends ValueGenerator { + private final int minSize; + private final int sizeRange; + private final Field child; + + public RandomList(int minSize, int maxSize, Field child) { + this.minSize = minSize; + this.sizeRange = maxSize - minSize + 1; + this.child = child; + } + + public void generate(ColumnVector v, int valueCount) { + ListColumnVector vector = (ListColumnVector) v; + for(int r=0; r < valueCount; ++r) { + if (nullProbability != 0 && random.nextDouble() < nullProbability) { + v.noNulls = false; + v.isNull[r] = true; + } else { + vector.offsets[r] = vector.childCount; + vector.lengths[r] = random.nextInt(sizeRange) + minSize; + vector.childCount += vector.lengths[r]; + } + } + vector.child.ensureSize(vector.childCount, false); + child.generator.generate(vector.child, vector.childCount); + } + } + + private class RandomStruct extends ValueGenerator { + private final Field[] children; + + public RandomStruct(Field[] children) { + this.children = children; + } + + public void generate(ColumnVector v, int valueCount) { + StructColumnVector vector = (StructColumnVector) v; + for(int r=0; r < valueCount; ++r) { + if (nullProbability != 0 && random.nextDouble() < nullProbability) { + v.noNulls = false; + v.isNull[r] = true; + } + } + for(int c=0; c < children.length; ++c) { + children[c].generator.generate(vector.fields[c], valueCount); + } + } + } + + private abstract class IntegerGenerator extends ValueGenerator { + private final long sign; + private final long mask; + + private IntegerGenerator(TypeDescription.Category kind) { + int bits = getIntegerLength(kind); + mask = bits == 64 ? 0 : -1L << bits; + sign = 1L << (bits - 1); + } + + protected void normalize(LongColumnVector vector, int valueCount) { + // make sure the value stays in range by sign extending it + for(int r=0; r < valueCount; ++r) { + if ((vector.vector[r] & sign) == 0) { + vector.vector[r] &= ~mask; + } else { + vector.vector[r] |= mask; + } + } + } + } + + private class AutoIncrement extends IntegerGenerator { + private long value; + private final long increment; + + private AutoIncrement(TypeDescription.Category kind, long start, + long increment) { + super(kind); + this.value = start; + this.increment = increment; + } + + public void generate(ColumnVector v, int valueCount) { + LongColumnVector vector = (LongColumnVector) v; + for(int r=0; r < valueCount; ++r) { + if (nullProbability != 0 && random.nextDouble() >= nullProbability) { + v.noNulls = false; + v.isNull[r] = true; + } else { + vector.vector[r] = value; + value += increment; + } + } + normalize(vector, valueCount); + } + } + + private class RandomInteger extends IntegerGenerator { + + private RandomInteger(TypeDescription.Category kind) { + super(kind); + } + + public void generate(ColumnVector v, int valueCount) { + LongColumnVector vector = (LongColumnVector) v; + for(int r=0; r < valueCount; ++r) { + if (nullProbability != 0 && random.nextDouble() < nullProbability) { + v.noNulls = false; + v.isNull[r] = true; + } else { + vector.vector[r] = random.nextLong(); + } + } + normalize(vector, valueCount); + } + } + + private class IntegerRange extends IntegerGenerator { + private final long minimum; + private final long range; + private final long limit; + + private IntegerRange(TypeDescription.Category kind, long minimum, + long maximum) { + super(kind); + this.minimum = minimum; + this.range = maximum - minimum + 1; + if (this.range < 0) { + throw new IllegalArgumentException("Can't support a negative range " + + range); + } + limit = (Long.MAX_VALUE / range) * range; + } + + public void generate(ColumnVector v, int valueCount) { + LongColumnVector vector = (LongColumnVector) v; + for(int r=0; r < valueCount; ++r) { + if (nullProbability != 0 && random.nextDouble() < nullProbability) { + v.noNulls = false; + v.isNull[r] = true; + } else { + long rand; + do { + // clear the sign bit + rand = random.nextLong() & Long.MAX_VALUE; + } while (rand >= limit); + vector.vector[r] = (rand % range) + minimum; + } + } + normalize(vector, valueCount); + } + } + + private class StringChooser extends ValueGenerator { + private final byte[][] choices; + private StringChooser(String[] values) { + choices = new byte[values.length][]; + for(int e=0; e < values.length; ++e) { + choices[e] = values[e].getBytes(StandardCharsets.UTF_8); + } + } + + public void generate(ColumnVector v, int valueCount) { + BytesColumnVector vector = (BytesColumnVector) v; + for(int r=0; r < valueCount; ++r) { + if (nullProbability != 0 && random.nextDouble() < nullProbability) { + v.noNulls = false; + v.isNull[r] = true; + } else { + int val = random.nextInt(choices.length); + vector.setRef(r, choices[val], 0, choices[val].length); + } + } + } + } + + private static byte[] concat(byte[] left, byte[] right) { + byte[] result = new byte[left.length + right.length]; + System.arraycopy(left, 0, result, 0, left.length); + System.arraycopy(right, 0, result, left.length, right.length); + return result; + } + + private static byte pickOne(byte[] choices, Random random) { + return choices[random.nextInt(choices.length)]; + } + + private static final byte[] LOWER_CONSONANTS = + "bcdfghjklmnpqrstvwxyz".getBytes(StandardCharsets.UTF_8); + private static final byte[] UPPER_CONSONANTS = + "BCDFGHJKLMNPQRSTVWXYZ".getBytes(StandardCharsets.UTF_8); + private static final byte[] CONSONANTS = + concat(LOWER_CONSONANTS, UPPER_CONSONANTS); + private static final byte[] LOWER_VOWELS = "aeiou".getBytes(StandardCharsets.UTF_8); + private static final byte[] UPPER_VOWELS = "AEIOU".getBytes(StandardCharsets.UTF_8); + private static final byte[] VOWELS = concat(LOWER_VOWELS, UPPER_VOWELS); + private static final byte[] LOWER_LETTERS = + concat(LOWER_CONSONANTS, LOWER_VOWELS); + private static final byte[] UPPER_LETTERS = + concat(UPPER_CONSONANTS, UPPER_VOWELS); + private static final byte[] LETTERS = concat(LOWER_LETTERS, UPPER_LETTERS); + private static final byte[] NATURAL_DIGITS = "123456789".getBytes(StandardCharsets.UTF_8); + private static final byte[] DIGITS = "0123456789".getBytes(StandardCharsets.UTF_8); + + private class StringPattern extends ValueGenerator { + private final byte[] buffer; + private final byte[][] choices; + private final int[] locations; + + private StringPattern(String pattern) { + buffer = pattern.getBytes(StandardCharsets.UTF_8); + int locs = 0; + for(int i=0; i < buffer.length; ++i) { + switch (buffer[i]) { + case 'C': + case 'c': + case 'E': + case 'V': + case 'v': + case 'F': + case 'l': + case 'L': + case 'D': + case 'x': + case 'X': + locs += 1; + break; + default: + break; + } + } + locations = new int[locs]; + choices = new byte[locs][]; + locs = 0; + for(int i=0; i < buffer.length; ++i) { + switch (buffer[i]) { + case 'C': + locations[locs] = i; + choices[locs++] = UPPER_CONSONANTS; + break; + case 'c': + locations[locs] = i; + choices[locs++] = LOWER_CONSONANTS; + break; + case 'E': + locations[locs] = i; + choices[locs++] = CONSONANTS; + break; + case 'V': + locations[locs] = i; + choices[locs++] = UPPER_VOWELS; + break; + case 'v': + locations[locs] = i; + choices[locs++] = LOWER_VOWELS; + break; + case 'F': + locations[locs] = i; + choices[locs++] = VOWELS; + break; + case 'l': + locations[locs] = i; + choices[locs++] = LOWER_LETTERS; + break; + case 'L': + locations[locs] = i; + choices[locs++] = UPPER_LETTERS; + break; + case 'D': + locations[locs] = i; + choices[locs++] = LETTERS; + break; + case 'x': + locations[locs] = i; + choices[locs++] = NATURAL_DIGITS; + break; + case 'X': + locations[locs] = i; + choices[locs++] = DIGITS; + break; + default: + break; + } + } + } + + public void generate(ColumnVector v, int valueCount) { + BytesColumnVector vector = (BytesColumnVector) v; + for(int r=0; r < valueCount; ++r) { + if (nullProbability != 0 && random.nextDouble() < nullProbability) { + v.noNulls = false; + v.isNull[r] = true; + } else { + for(int m=0; m < locations.length; ++m) { + buffer[locations[m]] = pickOne(choices[m], random); + } + vector.setVal(r, buffer, 0, buffer.length); + } + } + } + } + + private class TimestampRange extends ValueGenerator { + private final long minimum; + private final long range; + private final long limit; + + private TimestampRange(String min, String max) { + minimum = Timestamp.valueOf(min).getTime(); + range = Timestamp.valueOf(max).getTime() - minimum + 1; + if (range < 0) { + throw new IllegalArgumentException("Negative range " + range); + } + limit = (Long.MAX_VALUE / range) * range; + } + + public void generate(ColumnVector v, int valueCount) { + TimestampColumnVector vector = (TimestampColumnVector) v; + for(int r=0; r < valueCount; ++r) { + if (nullProbability != 0 && random.nextDouble() < nullProbability) { + v.noNulls = false; + v.isNull[r] = true; + } else { + long rand; + do { + // clear the sign bit + rand = random.nextLong() & Long.MAX_VALUE; + } while (rand >= limit); + vector.time[r] = (rand % range) + minimum; + vector.nanos[r] = random.nextInt(1000000); + } + } + } + } + + private static int getIntegerLength(TypeDescription.Category kind) { + switch (kind) { + case BYTE: + return 8; + case SHORT: + return 16; + case INT: + return 32; + case LONG: + return 64; + default: + throw new IllegalArgumentException("Unhandled type " + kind); + } + } + + public class Field { + private final TypeDescription type; + private Field[] children; + private ValueGenerator generator; + + private Field(TypeDescription type) { + this.type = type; + if (!type.getCategory().isPrimitive()) { + List childrenTypes = type.getChildren(); + children = new Field[childrenTypes.size()]; + for(int c=0; c < children.length; ++c) { + children[c] = new Field(childrenTypes.get(c)); + } + } + } + + public Field addAutoIncrement(long start, long increment) { + generator = new AutoIncrement(type.getCategory(), start, increment); + return this; + } + + public Field addIntegerRange(long min, long max) { + generator = new IntegerRange(type.getCategory(), min, max); + return this; + } + + public Field addRandomInt() { + generator = new RandomInteger(type.getCategory()); + return this; + } + + public Field addStringChoice(String... choices) { + if (type.getCategory() != TypeDescription.Category.STRING) { + throw new IllegalArgumentException("Must be string - " + type); + } + generator = new StringChooser(choices); + return this; + } + + public Field addStringPattern(String pattern) { + if (type.getCategory() != TypeDescription.Category.STRING) { + throw new IllegalArgumentException("Must be string - " + type); + } + generator = new StringPattern(pattern); + return this; + } + + public Field addTimestampRange(String start, String end) { + if (type.getCategory() != TypeDescription.Category.TIMESTAMP) { + throw new IllegalArgumentException("Must be timestamp - " + type); + } + generator = new TimestampRange(start, end); + return this; + } + + public Field addBoolean() { + if (type.getCategory() != TypeDescription.Category.BOOLEAN) { + throw new IllegalArgumentException("Must be boolean - " + type); + } + generator = new RandomBoolean(); + return this; + } + + public Field hasNulls(double probability) { + generator.nullProbability = probability; + return this; + } + + public Field addStruct() { + generator = new RandomStruct(children); + return this; + } + + public Field addList(int minSize, int maxSize) { + generator = new RandomList(minSize, maxSize, children[0]); + return this; + } + + public Field getChildField(int child) { + return children[child]; + } + } + + public Field addField(String name, TypeDescription.Category kind) { + TypeDescription type = new TypeDescription(kind); + return addField(name, type); + } + + public Field addField(String name, TypeDescription type) { + schema.addField(name, type); + Field result = new Field(type); + fields.add(result); + return result; + } + + public void generate(VectorizedRowBatch batch, int rowCount) { + batch.reset(); + for(int c=0; c < batch.cols.length; ++c) { + fields.get(c).generator.generate(batch.cols[c], rowCount); + } + batch.size = rowCount; + } + + /** + * Get the schema for the table that is being generated. + * @return + */ + public TypeDescription getSchema() { + return schema; + } +} diff --git a/java/bench/src/java/org/apache/orc/bench/SalesGenerator.java b/java/bench/src/java/org/apache/orc/bench/SalesGenerator.java new file mode 100644 index 0000000000..2be3537ed1 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/SalesGenerator.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench; + +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.TypeDescription; +import org.apache.orc.bench.convert.BatchReader; + +public class SalesGenerator implements BatchReader { + private final RandomGenerator generator; + private long rowsRemaining; + private final static double MOSTLY = 0.99999; + + public SalesGenerator(long rows) { + this(rows, 42); + } + + public SalesGenerator(long rows, int seed) { + generator = new RandomGenerator(seed); + // column 1 + generator.addField("sales_id", TypeDescription.Category.LONG) + .addAutoIncrement(1000000000, 1); + generator.addField("customer_id", TypeDescription.Category.LONG) + .addIntegerRange(1000000000, 2000000000); + generator.addField("col3", TypeDescription.Category.LONG) + .addIntegerRange(1, 10000).hasNulls(0.9993100389335173); + + // column 4 + generator.addField("item_category", TypeDescription.Category.LONG) + .addIntegerRange(1, 1000000).hasNulls(0.00014784879996054823); + generator.addField("item_count", TypeDescription.Category.LONG) + .addIntegerRange(1, 1000); + generator.addField("change_ts", TypeDescription.Category.TIMESTAMP) + .addTimestampRange("2003-01-01 00:00:00", "2017-03-14 23:59:59"); + + // column 7 + generator.addField("store_location", TypeDescription.Category.STRING) + .addStringChoice("Los Angeles", "New York", "Cupertino", "Sunnyvale", + "Boston", "Chicago", "Seattle", "Jackson", + "Palo Alto", "San Mateo", "San Jose", "Santa Clara", + "Irvine", "Torrance", "Gardena", "Hermosa", "Manhattan") + .hasNulls(0.0004928293332019384); + generator.addField("associate_id", TypeDescription.Category.STRING) + .addStringPattern("MR V").hasNulls(0.05026859198659506); + generator.addField("col9", TypeDescription.Category.LONG) + .addIntegerRange(1, 1000000000).hasNulls(MOSTLY); + + // column 10 + generator.addField("rebate_id", TypeDescription.Category.STRING) + .addStringPattern("xxxxxx").hasNulls(MOSTLY); + generator.addField("create_ts", TypeDescription.Category.TIMESTAMP) + .addTimestampRange("2003-01-01 00:00:00", "2017-03-14 23:59:59"); + generator.addField("col13", TypeDescription.Category.LONG) + .addIntegerRange(1, 100000).hasNulls(MOSTLY); + + // column 13 + generator.addField("size", TypeDescription.Category.STRING) + .addStringChoice("Small", "Medium", "Large", "XL") + .hasNulls(0.9503720861465674); + generator.addField("col14", TypeDescription.Category.LONG) + .addIntegerRange(1, 100000); + generator.addField("fulfilled", TypeDescription.Category.BOOLEAN) + .addBoolean(); + + // column 16 + generator.addField("global_id", TypeDescription.Category.STRING) + .addStringPattern("xxxxxxxxxxxxxxxx").hasNulls(0.021388793060962974); + generator.addField("col17", TypeDescription.Category.STRING) + .addStringPattern("L-xx").hasNulls(MOSTLY); + generator.addField("col18", TypeDescription.Category.STRING) + .addStringPattern("ll").hasNulls(MOSTLY); + + // column 19 + generator.addField("col19", TypeDescription.Category.LONG) + .addIntegerRange(1, 100000); + generator.addField("has_rebate", TypeDescription.Category.BOOLEAN) + .addBoolean(); + RandomGenerator.Field list = + generator.addField("col21", + TypeDescription.fromString("array>")) + .addList(0, 3) + .hasNulls(MOSTLY); + RandomGenerator.Field struct = list.getChildField(0).addStruct(); + struct.getChildField(0).addIntegerRange(0, 10000000); + struct.getChildField(1).addStringPattern("VVVVV"); + struct.getChildField(2).addStringPattern("VVVVVVVV"); + struct.getChildField(3).addIntegerRange(0, 10000000); + struct.getChildField(4).addIntegerRange(0, 10000000); + struct.getChildField(5).addStringPattern("VVVVVVVV"); + + // column 38 + generator.addField("vendor_id", TypeDescription.Category.STRING) + .addStringPattern("Lxxxxxx").hasNulls(0.1870780148834459); + generator.addField("country", TypeDescription.Category.STRING) + .addStringChoice("USA", "Germany", "Ireland", "Canada", "Mexico", + "Denmark").hasNulls(0.0004928293332019384); + + // column 40 + generator.addField("backend_version", TypeDescription.Category.STRING) + .addStringPattern("X.xx").hasNulls(0.0005913951998423039); + generator.addField("col41", TypeDescription.Category.LONG) + .addIntegerRange(1000000000, 100000000000L); + generator.addField("col42", TypeDescription.Category.LONG) + .addIntegerRange(1, 1000000000); + + // column 43 + generator.addField("col43", TypeDescription.Category.LONG) + .addIntegerRange(1000000000, 10000000000L).hasNulls(0.9763934749396284); + generator.addField("col44", TypeDescription.Category.LONG) + .addIntegerRange(1, 100000000); + generator.addField("col45", TypeDescription.Category.LONG) + .addIntegerRange(1, 100000000); + + // column 46 + generator.addField("col46", TypeDescription.Category.LONG) + .addIntegerRange(1, 10000000); + generator.addField("col47", TypeDescription.Category.LONG) + .addIntegerRange(1, 1000); + generator.addField("col48", TypeDescription.Category.LONG) + .addIntegerRange(1, 1000000).hasNulls(MOSTLY); + + // column 49 + generator.addField("col49", TypeDescription.Category.STRING) + .addStringPattern("xxxx").hasNulls(0.0004928293332019384); + generator.addField("col50", TypeDescription.Category.STRING) + .addStringPattern("ll").hasNulls(0.9496821250800848); + generator.addField("col51", TypeDescription.Category.LONG) + .addIntegerRange(1, 1000000).hasNulls(0.9999014341333596); + + // column 52 + generator.addField("col52", TypeDescription.Category.LONG) + .addIntegerRange(1, 1000000).hasNulls(0.9980779656005125); + generator.addField("col53", TypeDescription.Category.LONG) + .addIntegerRange(1, 1000000000); + generator.addField("col54", TypeDescription.Category.LONG) + .addIntegerRange(1, 1000000000); + + // column 55 + generator.addField("col55", TypeDescription.Category.STRING) + .addStringChoice("X"); + generator.addField("col56", TypeDescription.Category.TIMESTAMP) + .addTimestampRange("2003-01-01 00:00:00", "2017-03-14 23:59:59"); + generator.addField("col57", TypeDescription.Category.TIMESTAMP) + .addTimestampRange("2003-01-01 00:00:00", "2017-03-14 23:59:59"); + + // column 58 + generator.addField("md5", TypeDescription.Category.LONG) + .addRandomInt(); + generator.addField("col59", TypeDescription.Category.LONG) + .addIntegerRange(1000000000, 10000000000L); + generator.addField("col69", TypeDescription.Category.TIMESTAMP) + .addTimestampRange("2003-01-01 00:00:00", "2017-03-14 23:59:59") + .hasNulls(MOSTLY); + + // column 61 + generator.addField("col61", TypeDescription.Category.STRING) + .addStringPattern("X.xx").hasNulls(0.11399142476960233); + generator.addField("col62", TypeDescription.Category.STRING) + .addStringPattern("X.xx").hasNulls(0.9986200778670347); + generator.addField("col63", TypeDescription.Category.TIMESTAMP) + .addTimestampRange("2003-01-01 00:00:00", "2017-03-14 23:59:59"); + + // column 64 + generator.addField("col64", TypeDescription.Category.LONG) + .addIntegerRange(1, 1000000).hasNulls(MOSTLY); + rowsRemaining = rows; + } + + public boolean nextBatch(VectorizedRowBatch batch) { + int rows = (int) Math.min(batch.getMaxSize(), rowsRemaining); + generator.generate(batch, rows); + rowsRemaining -= rows; + return rows != 0; + } + + @Override + public void close() { + // PASS + } + + public TypeDescription getSchema() { + return generator.getSchema(); + } + + public static void main(String[] args) throws Exception { + SalesGenerator sales = new SalesGenerator(10, 42); + System.out.println("Schema " + sales.getSchema()); + } +} diff --git a/java/bench/src/java/org/apache/orc/bench/Utilities.java b/java/bench/src/java/org/apache/orc/bench/Utilities.java new file mode 100644 index 0000000000..7016f5e07b --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/Utilities.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Properties; + +public class Utilities { + + public static TypeDescription loadSchema(String name) throws IOException { + InputStream in = Utilities.class.getClassLoader().getResourceAsStream(name); + byte[] buffer= new byte[1 * 1024]; + int len = in.read(buffer); + StringBuilder string = new StringBuilder(); + while (len > 0) { + for(int i=0; i < len; ++i) { + // strip out + if (buffer[i] != '\n' && buffer[i] != ' ') { + string.append((char) buffer[i]); + } + } + len = in.read(buffer); + } + return TypeDescription.fromString(string.toString()); + } + + public static org.apache.orc.CompressionKind getCodec(CompressionKind compression) { + switch (compression) { + case NONE: + return org.apache.orc.CompressionKind.NONE; + case ZLIB: + return org.apache.orc.CompressionKind.ZLIB; + case SNAPPY: + return org.apache.orc.CompressionKind.SNAPPY; + default: + throw new IllegalArgumentException("Unknown compression " + compression); + } + } + + public static Iterable sliceArray(final String[] array, + final int start) { + return new Iterable() { + String[] values = array; + int posn = start; + + @Override + public Iterator iterator() { + return new Iterator() { + @Override + public boolean hasNext() { + return posn < values.length; + } + + @Override + public String next() { + if (posn >= values.length) { + throw new NoSuchElementException("Index off end of array." + + " index = " + posn + " length = " + values.length); + } else { + return values[posn++]; + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException("No remove"); + } + }; + } + }; + } + + public static Properties convertSchemaToHiveConfig(TypeDescription schema) { + Properties result = new Properties(); + if (schema.getCategory() != TypeDescription.Category.STRUCT) { + throw new IllegalArgumentException("Hive requires struct root types" + + " instead of " + schema); + } + StringBuilder columns = new StringBuilder(); + StringBuilder types = new StringBuilder(); + List columnNames = schema.getFieldNames(); + List columnTypes = schema.getChildren(); + for(int c=0; c < columnNames.size(); ++c) { + if (c != 0) { + columns.append(","); + types.append(","); + } + columns.append(columnNames.get(c)); + types.append(columnTypes.get(c)); + } + result.setProperty(serdeConstants.LIST_COLUMNS, columns.toString()); + result.setProperty(serdeConstants.LIST_COLUMN_TYPES, types.toString()); + return result; + } + + public static Path getVariant(Path root, + String data, + String format, + String compress) { + return new Path(root, "generated/" + data + "/" + format + "." + compress); + } +} diff --git a/java/bench/src/java/org/apache/orc/bench/convert/BatchReader.java b/java/bench/src/java/org/apache/orc/bench/convert/BatchReader.java new file mode 100644 index 0000000000..b9ea3567ef --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/convert/BatchReader.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench.convert; + +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; + +import java.io.IOException; + +/** + * Generic interface for reading data. + */ +public interface BatchReader extends AutoCloseable { + + boolean nextBatch(VectorizedRowBatch batch) throws IOException; + + @Override + void close() throws IOException; +} diff --git a/java/bench/src/java/org/apache/orc/bench/convert/BatchWriter.java b/java/bench/src/java/org/apache/orc/bench/convert/BatchWriter.java new file mode 100644 index 0000000000..c79d93736b --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/convert/BatchWriter.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench.convert; + +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; + +import java.io.IOException; + +/** + * Generic interface for writing data. + */ +public interface BatchWriter extends AutoCloseable { + + void writeBatch(VectorizedRowBatch batch) throws IOException; + + @Override + void close() throws IOException; +} diff --git a/java/bench/src/java/org/apache/orc/bench/convert/GenerateVariants.java b/java/bench/src/java/org/apache/orc/bench/convert/GenerateVariants.java new file mode 100644 index 0000000000..7f57468820 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/convert/GenerateVariants.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench.convert; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.TypeDescription; +import org.apache.orc.bench.CompressionKind; +import org.apache.orc.bench.SalesGenerator; +import org.apache.orc.bench.Utilities; +import org.apache.orc.bench.convert.avro.AvroReader; +import org.apache.orc.bench.convert.avro.AvroWriter; +import org.apache.orc.bench.convert.csv.CsvReader; +import org.apache.orc.bench.convert.json.JsonReader; +import org.apache.orc.bench.convert.json.JsonWriter; +import org.apache.orc.bench.convert.orc.OrcReader; +import org.apache.orc.bench.convert.orc.OrcWriter; +import org.apache.orc.bench.convert.parquet.ParquetReader; +import org.apache.orc.bench.convert.parquet.ParquetWriter; + +import java.io.IOException; + +/** + * A tool to create the different variants that we need to benchmark against. + */ +public class GenerateVariants { + + public static BatchWriter createFileWriter(Path file, + String format, + TypeDescription schema, + Configuration conf, + CompressionKind compress + ) throws IOException { + FileSystem fs = file.getFileSystem(conf); + fs.delete(file, false); + fs.mkdirs(file.getParent()); + switch (format) { + case "json": + return new JsonWriter(file, schema, conf, compress); + case "orc": + return new OrcWriter(file, schema, conf, compress); + case "avro": + return new AvroWriter(file, schema, conf, compress); + case "parquet": + return new ParquetWriter(file, schema, conf, compress); + default: + throw new IllegalArgumentException("Unknown format " + format); + } + } + + public static BatchReader createFileReader(Path file, + String format, + TypeDescription schema, + Configuration conf, + CompressionKind compress + ) throws IOException { + switch (format) { + case "csv": + return new CsvReader(file, schema, conf, compress); + case "json": + return new JsonReader(file, schema, conf, compress); + case "orc": + return new OrcReader(file, schema, conf); + case "avro": + return new AvroReader(file, schema, conf); + case "parquet": + return new ParquetReader(file, schema, conf); + default: + throw new IllegalArgumentException("Unknown format " + format); + } + } + + static class RecursiveReader implements BatchReader { + private final RemoteIterator filenames; + private final String format; + private final TypeDescription schema; + private final Configuration conf; + private final CompressionKind compress; + private BatchReader current = null; + + RecursiveReader(Path root, + String format, + TypeDescription schema, + Configuration conf, + CompressionKind compress) throws IOException { + FileSystem fs = root.getFileSystem(conf); + filenames = fs.listFiles(root, true); + this.format = format; + this.schema = schema; + this.conf = conf; + this.compress = compress; + } + + @Override + public boolean nextBatch(VectorizedRowBatch batch) throws IOException { + while (current == null || !current.nextBatch(batch)) { + if (filenames.hasNext()) { + LocatedFileStatus next = filenames.next(); + if (next.isFile()) { + current = createFileReader(next.getPath(), format, schema, conf, + compress); + } + } else { + return false; + } + } + return true; + } + + @Override + public void close() throws IOException { + if (current != null) { + current.close(); + } + } + } + + public static BatchReader createReader(Path root, + String dataName, + TypeDescription schema, + Configuration conf, + long salesRecords) throws IOException { + switch (dataName) { + case "taxi": + return new RecursiveReader(new Path(root, "sources/" + dataName), "csv", + schema, conf, CompressionKind.ZLIB); + case "sales": + return new SalesGenerator(salesRecords); + case "github": + return new RecursiveReader(new Path(root, "sources/" + dataName), "json", + schema, conf, CompressionKind.ZLIB); + default: + throw new IllegalArgumentException("Unknown data name " + dataName); + } + } + + static CommandLine parseCommandLine(String[] args) throws ParseException { + Options options = new Options() + .addOption("h", "help", false, "Provide help") + .addOption("c", "compress", true, "List of compression") + .addOption("d", "data", true, "List of data sets") + .addOption("f", "format", true, "List of formats") + .addOption("s", "sales", true, "Number of records for sales"); + CommandLine result = new DefaultParser().parse(options, args); + if (result.hasOption("help") || result.getArgs().length == 0) { + new HelpFormatter().printHelp("convert ", options); + System.exit(1); + } + return result; + } + + public static void main(String[] args) throws Exception { + CommandLine cli = parseCommandLine(args); + String[] compressList = + cli.getOptionValue("compress", "none,snappy,zlib").split(","); + String[] dataList = + cli.getOptionValue("data", "taxi,sales,github").split(","); + String[] formatList = + cli.getOptionValue("format", "avro,json,orc,parquet").split(","); + long records = Long.parseLong(cli.getOptionValue("sales", "25000000")); + Configuration conf = new Configuration(); + Path root = new Path(cli.getArgs()[0]); + for(String data: dataList) { + // Set up the reader + TypeDescription schema = Utilities.loadSchema(data + ".schema"); + BatchReader reader = createReader(root, data, schema, conf, records); + + // Set up the writers for each combination + BatchWriter[] writers = new BatchWriter[compressList.length * formatList.length]; + for(int compress=0; compress < compressList.length; ++compress) { + CompressionKind compressionKind = + CompressionKind.valueOf(compressList[compress].toUpperCase()); + for(int format=0; format < formatList.length; ++format) { + Path outPath = Utilities.getVariant(root, data, formatList[format], + compressList[compress]); + writers[compress * formatList.length + format] = + createFileWriter(outPath, formatList[format], schema, conf, + compressionKind); + } + } + + // Copy the rows + VectorizedRowBatch batch = schema.createRowBatch(); + while (reader.nextBatch(batch)) { + for(BatchWriter writer: writers) { + writer.writeBatch(batch); + } + } + reader.close(); + for(BatchWriter writer: writers) { + writer.close(); + } + } + } +} diff --git a/java/bench/src/java/org/apache/orc/bench/convert/ScanVariants.java b/java/bench/src/java/org/apache/orc/bench/convert/ScanVariants.java new file mode 100644 index 0000000000..ae76238e8e --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/convert/ScanVariants.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench.convert; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.TypeDescription; +import org.apache.orc.bench.CompressionKind; +import org.apache.orc.bench.Utilities; + +/** + * A tool to create the different variants that we need to benchmark against. + */ +public class ScanVariants { + + + static CommandLine parseCommandLine(String[] args) throws ParseException { + Options options = new Options() + .addOption("h", "help", false, "Provide help") + .addOption("c", "compress", true, "List of compression") + .addOption("d", "data", true, "List of data sets") + .addOption("f", "format", true, "List of formats"); + CommandLine result = new DefaultParser().parse(options, args); + if (result.hasOption("help") || result.getArgs().length == 0) { + new HelpFormatter().printHelp("scan ", options); + System.exit(1); + } + return result; + } + + public static void main(String[] args) throws Exception { + CommandLine cli = parseCommandLine(args); + String[] compressList = + cli.getOptionValue("compress", "none,snappy,zlib").split(","); + String[] dataList = + cli.getOptionValue("data", "taxi,sales,github").split(","); + String[] formatList = + cli.getOptionValue("format", "avro,json,orc,parquet").split(","); + Configuration conf = new Configuration(); + Path root = new Path(cli.getArgs()[0]); + for(String data: dataList) { + TypeDescription schema = Utilities.loadSchema(data + ".schema"); + VectorizedRowBatch batch = schema.createRowBatch(); + for (String compress : compressList) { + CompressionKind compressKind = + CompressionKind.valueOf(compress.toUpperCase()); + for (String format : formatList) { + Path filename = Utilities.getVariant(root, data, format, + compress); + BatchReader reader = GenerateVariants.createFileReader(filename, + format, schema, conf, compressKind); + long rows = 0; + long batches = 0; + while (reader.nextBatch(batch)) { + batches += 1; + rows += batch.size; + } + System.out.println(filename + " rows: " + rows + " batches: " + + batches); + reader.close(); + } + } + } + } +} diff --git a/java/bench/src/java/org/apache/orc/bench/convert/avro/AvroReader.java b/java/bench/src/java/org/apache/orc/bench/convert/avro/AvroReader.java new file mode 100644 index 0000000000..fc354d6c49 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/convert/avro/AvroReader.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench.convert.avro; + +import org.apache.avro.file.DataFileReader; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.mapred.FsInput; +import org.apache.avro.util.Utf8; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.TypeDescription; +import org.apache.orc.bench.convert.BatchReader; + +import java.io.IOException; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.util.List; + +public class AvroReader implements BatchReader { + private final DataFileReader dataFileReader; + private GenericRecord record = null; + private final AvroConverter[] converters; + + public AvroReader(Path path, + TypeDescription schema, + Configuration conf) throws IOException { + FsInput file = new FsInput(path, conf); + DatumReader datumReader = new GenericDatumReader<>(); + dataFileReader = new DataFileReader<>(file, datumReader); + List children = schema.getChildren(); + converters = new AvroConverter[children.size()]; + for(int c=0; c < converters.length; ++c) { + converters[c] = createConverter(children.get(c)); + } + } + + @Override + public boolean nextBatch(VectorizedRowBatch batch) throws IOException { + batch.reset(); + int maxSize = batch.getMaxSize(); + while (dataFileReader.hasNext() && batch.size < maxSize) { + record = dataFileReader.next(record); + int row = batch.size++; + for(int c=0; c < converters.length; ++c) { + converters[c].convert(batch.cols[c], row, record.get(c)); + } + } + return batch.size != 0; + } + + @Override + public void close() throws IOException { + dataFileReader.close(); + } + + interface AvroConverter { + void convert(ColumnVector vector, int row, Object value); + } + + private static class BooleanConverter implements AvroConverter { + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + ((LongColumnVector) cv).vector[row] = + ((Boolean) value).booleanValue() ? 1 : 0; + } + } + } + + private static class IntConverter implements AvroConverter { + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + ((LongColumnVector) cv).vector[row] = + ((Integer) value).intValue(); + } + } + } + + private static class LongConverter implements AvroConverter { + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + ((LongColumnVector) cv).vector[row] = + ((Long) value).longValue(); + } + } + } + + private static class FloatConverter implements AvroConverter { + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + ((DoubleColumnVector) cv).vector[row] = + ((Float) value).floatValue(); + } + } + } + + private static class DoubleConverter implements AvroConverter { + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + ((DoubleColumnVector) cv).vector[row] = + ((Double) value).doubleValue(); + } + } + } + + private static class StringConverter implements AvroConverter { + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + byte[] bytes = ((Utf8) value).getBytes(); + ((BytesColumnVector) cv).setRef(row, bytes, 0, bytes.length); + } + } + } + + private static class BinaryConverter implements AvroConverter { + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + ByteBuffer buf = (ByteBuffer) value; + ((BytesColumnVector) cv).setVal(row, buf.array(), buf.arrayOffset(), + buf.remaining()); + } + } + } + + private static class TimestampConverter implements AvroConverter { + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + TimestampColumnVector tc = (TimestampColumnVector) cv; + tc.time[row] = ((Long) value).longValue(); + tc.nanos[row] = 0; + } + } + } + + private static class DecimalConverter implements AvroConverter { + final int scale; + DecimalConverter(int scale) { + this.scale = scale; + } + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + DecimalColumnVector tc = (DecimalColumnVector) cv; + tc.vector[row].set(getHiveDecimalFromByteBuffer((ByteBuffer) value, scale)); + } + } + } + + private static class ListConverter implements AvroConverter { + final AvroConverter childConverter; + + ListConverter(TypeDescription schema) { + childConverter = createConverter(schema.getChildren().get(0)); + } + + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + ListColumnVector tc = (ListColumnVector) cv; + GenericData.Array array = (GenericData.Array) value; + int start = tc.childCount; + int len = array.size(); + tc.childCount += len; + tc.child.ensureSize(tc.childCount, true); + for(int i=0; i < len; ++i) { + childConverter.convert(tc.child, start + i, array.get(i)); + } + } + } + } + + private static class StructConverter implements AvroConverter { + final AvroConverter[] childConverters; + + StructConverter(TypeDescription schema) { + List children = schema.getChildren(); + childConverters = new AvroConverter[children.size()]; + for(int i=0; i < childConverters.length; ++i) { + childConverters[i] = createConverter(children.get(i)); + } + } + + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + StructColumnVector tc = (StructColumnVector) cv; + GenericData.Record record = (GenericData.Record) value; + for(int c=0; c < tc.fields.length; ++c) { + childConverters[c].convert(tc.fields[c], row, record.get(c)); + } + } + } + } + + static AvroConverter createConverter(TypeDescription types) { + switch (types.getCategory()) { + case BINARY: + return new BinaryConverter(); + case BOOLEAN: + return new BooleanConverter(); + case BYTE: + case SHORT: + case INT: + return new IntConverter(); + case LONG: + return new LongConverter(); + case FLOAT: + return new FloatConverter(); + case DOUBLE: + return new DoubleConverter(); + case CHAR: + case VARCHAR: + case STRING: + return new StringConverter(); + case TIMESTAMP: + return new TimestampConverter(); + case DECIMAL: + return new DecimalConverter(types.getScale()); + case LIST: + return new ListConverter(types); + case STRUCT: + return new StructConverter(types); + default: + throw new IllegalArgumentException("Unhandled type " + types); + } + } + + static byte[] getBytesFromByteBuffer(ByteBuffer byteBuffer) { + byteBuffer.rewind(); + byte[] result = new byte[byteBuffer.limit()]; + byteBuffer.get(result); + return result; + } + + static HiveDecimal getHiveDecimalFromByteBuffer(ByteBuffer byteBuffer, + int scale) { + byte[] result = getBytesFromByteBuffer(byteBuffer); + HiveDecimal dec = HiveDecimal.create(new BigInteger(result), scale); + return dec; + } +} diff --git a/java/bench/src/java/org/apache/orc/bench/convert/avro/AvroSchemaUtils.java b/java/bench/src/java/org/apache/orc/bench/convert/avro/AvroSchemaUtils.java new file mode 100644 index 0000000000..6c72a0ee61 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/convert/avro/AvroSchemaUtils.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.bench.convert.avro; + +import org.apache.avro.Schema; +import org.apache.orc.TypeDescription; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * GenerateVariants Hive TypeInfo to an Avro Schema + */ +public class AvroSchemaUtils { + + private AvroSchemaUtils() { + // No instances + } + + public static Schema createAvroSchema(TypeDescription typeInfo) { + Schema schema; + switch (typeInfo.getCategory()) { + case STRING: + schema = Schema.create(Schema.Type.STRING); + break; + case CHAR: + schema = getSchemaFor("{" + + "\"type\":\"string\"," + + "\"logicalType\":\"char\"," + + "\"maxLength\":" + typeInfo.getMaxLength() + "}"); + break; + case VARCHAR: + schema = getSchemaFor("{" + + "\"type\":\"string\"," + + "\"logicalType\":\"varchar\"," + + "\"maxLength\":" + typeInfo.getMaxLength() + "}"); + break; + case BINARY: + schema = Schema.create(Schema.Type.BYTES); + break; + case BYTE: + schema = Schema.create(Schema.Type.INT); + break; + case SHORT: + schema = Schema.create(Schema.Type.INT); + break; + case INT: + schema = Schema.create(Schema.Type.INT); + break; + case LONG: + schema = Schema.create(Schema.Type.LONG); + break; + case FLOAT: + schema = Schema.create(Schema.Type.FLOAT); + break; + case DOUBLE: + schema = Schema.create(Schema.Type.DOUBLE); + break; + case BOOLEAN: + schema = Schema.create(Schema.Type.BOOLEAN); + break; + case DECIMAL: + String precision = String.valueOf(typeInfo.getPrecision()); + String scale = String.valueOf(typeInfo.getScale()); + schema = getSchemaFor("{" + + "\"type\":\"bytes\"," + + "\"logicalType\":\"decimal\"," + + "\"precision\":" + precision + "," + + "\"scale\":" + scale + "}"); + break; + case DATE: + schema = getSchemaFor("{" + + "\"type\":\"int\"," + + "\"logicalType\":\"date\"}"); + break; + case TIMESTAMP: + schema = getSchemaFor("{" + + "\"type\":\"long\"," + + "\"logicalType\":\"timestamp-millis\"}"); + break; + case LIST: + schema = createAvroArray(typeInfo); + break; + case MAP: + schema = createAvroMap(typeInfo); + break; + case STRUCT: + schema = createAvroRecord(typeInfo); + break; + case UNION: + schema = createAvroUnion(typeInfo); + break; + default: + throw new UnsupportedOperationException(typeInfo + " is not supported."); + } + + return schema; + } + + private static Schema createAvroUnion(TypeDescription typeInfo) { + List childSchemas = new ArrayList<>(); + for (TypeDescription childTypeInfo : typeInfo.getChildren()) { + Schema childSchema = createAvroSchema(childTypeInfo); + if (childSchema.getType() == Schema.Type.UNION) { + for (Schema grandkid: childSchema.getTypes()) { + if (childSchema.getType() != Schema.Type.NULL) { + childSchemas.add(grandkid); + } + } + } else { + childSchemas.add(childSchema); + } + } + + return wrapInUnionWithNull(Schema.createUnion(childSchemas)); + } + + private static Schema createAvroRecord(TypeDescription typeInfo) { + List childFields = new ArrayList<>(); + + List fieldNames = typeInfo.getFieldNames(); + List fieldTypes = typeInfo.getChildren(); + + for (int i = 0; i < fieldNames.size(); ++i) { + TypeDescription childTypeInfo = fieldTypes.get(i); + Schema.Field field = new Schema.Field(fieldNames.get(i), + wrapInUnionWithNull(createAvroSchema(childTypeInfo)), + childTypeInfo.toString(), + (Object) null); + childFields.add(field); + } + + Schema recordSchema = Schema.createRecord("record_" + typeInfo.getId(), + typeInfo.toString(), null, false); + recordSchema.setFields(childFields); + return recordSchema; + } + + private static Schema createAvroMap(TypeDescription typeInfo) { + TypeDescription keyTypeInfo = typeInfo.getChildren().get(0); + if (keyTypeInfo.getCategory() != TypeDescription.Category.STRING) { + throw new UnsupportedOperationException("Avro only supports maps with string keys " + + typeInfo); + } + + Schema valueSchema = wrapInUnionWithNull(createAvroSchema + (typeInfo.getChildren().get(1))); + + return Schema.createMap(valueSchema); + } + + private static Schema createAvroArray(TypeDescription typeInfo) { + Schema child = createAvroSchema(typeInfo.getChildren().get(0)); + return Schema.createArray(wrapInUnionWithNull(child)); + } + + private static Schema wrapInUnionWithNull(Schema schema) { + Schema NULL = Schema.create(Schema.Type.NULL); + switch (schema.getType()) { + case NULL: + return schema; + case UNION: + List kids = schema.getTypes(); + List newKids = new ArrayList<>(kids.size() + 1); + newKids.add(NULL); + return Schema.createUnion(newKids); + default: + return Schema.createUnion(Arrays.asList(NULL, schema)); + } + } + + private static Schema getSchemaFor(String str) { + Schema.Parser parser = new Schema.Parser(); + return parser.parse(str); + } +} diff --git a/java/bench/src/java/org/apache/orc/bench/convert/avro/AvroWriter.java b/java/bench/src/java/org/apache/orc/bench/convert/avro/AvroWriter.java new file mode 100644 index 0000000000..44defbf579 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/convert/avro/AvroWriter.java @@ -0,0 +1,363 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench.convert.avro; + +import org.apache.avro.Schema; +import org.apache.avro.file.CodecFactory; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.TypeDescription; +import org.apache.orc.bench.convert.BatchWriter; +import org.apache.orc.bench.CompressionKind; + +import java.io.IOException; +import java.nio.Buffer; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.List; + +public class AvroWriter implements BatchWriter { + + interface AvroConverter { + Object convert(ColumnVector vector, int row); + } + + private static class BooleanConverter implements AvroConverter { + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + LongColumnVector vector = (LongColumnVector) cv; + return vector.vector[row] != 0; + } else { + return null; + } + } + } + + private static class IntConverter implements AvroConverter { + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + LongColumnVector vector = (LongColumnVector) cv; + return (int) vector.vector[row]; + } else { + return null; + } + } + } + + private static class LongConverter implements AvroConverter { + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + LongColumnVector vector = (LongColumnVector) cv; + return vector.vector[row]; + } else { + return null; + } + } + } + + private static class FloatConverter implements AvroConverter { + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + DoubleColumnVector vector = (DoubleColumnVector) cv; + return (float) vector.vector[row]; + } else { + return null; + } + } + } + + private static class DoubleConverter implements AvroConverter { + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + DoubleColumnVector vector = (DoubleColumnVector) cv; + return vector.vector[row]; + } else { + return null; + } + } + } + + private static class StringConverter implements AvroConverter { + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + BytesColumnVector vector = (BytesColumnVector) cv; + return new String(vector.vector[row], vector.start[row], + vector.length[row], StandardCharsets.UTF_8); + } else { + return null; + } + } + } + + private static class BinaryConverter implements AvroConverter { + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + BytesColumnVector vector = (BytesColumnVector) cv; + return ByteBuffer.wrap(vector.vector[row], vector.start[row], + vector.length[row]); + } else { + return null; + } + } + } + + private static class TimestampConverter implements AvroConverter { + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + TimestampColumnVector vector = (TimestampColumnVector) cv; + return vector.time[row]; + } else { + return null; + } + } + } + + private static class DecimalConverter implements AvroConverter { + final int scale; + DecimalConverter(int scale) { + this.scale = scale; + } + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + DecimalColumnVector vector = (DecimalColumnVector) cv; + return getBufferFromDecimal( + vector.vector[row].getHiveDecimal(), scale); + } else { + return null; + } + } + } + + private static class ListConverter implements AvroConverter { + final Schema avroSchema; + final AvroConverter childConverter; + + ListConverter(TypeDescription schema, Schema avroSchema) { + this.avroSchema = avroSchema; + childConverter = createConverter(schema.getChildren().get(0), + removeNullable(avroSchema.getElementType())); + } + + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + ListColumnVector vector = (ListColumnVector) cv; + int offset = (int) vector.offsets[row]; + int length = (int) vector.lengths[row]; + GenericData.Array result = new GenericData.Array(length, avroSchema); + for(int i=0; i < length; ++i) { + result.add(childConverter.convert(vector.child, offset + i)); + } + return result; + } else { + return null; + } + } + } + + private static class StructConverter implements AvroConverter { + final Schema avroSchema; + final AvroConverter[] childConverters; + + StructConverter(TypeDescription schema, Schema avroSchema) { + this.avroSchema = avroSchema; + List childrenTypes = schema.getChildren(); + childConverters = new AvroConverter[childrenTypes.size()]; + List fields = avroSchema.getFields(); + for(int f=0; f < childConverters.length; ++f) { + childConverters[f] = createConverter(childrenTypes.get(f), + removeNullable(fields.get(f).schema())); + } + } + + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + StructColumnVector vector = (StructColumnVector) cv; + GenericData.Record result = new GenericData.Record(avroSchema); + for(int f=0; f < childConverters.length; ++f) { + result.put(f, childConverters[f].convert(vector.fields[f], row)); + } + return result; + } else { + return null; + } + } + } + + static AvroConverter createConverter(TypeDescription types, + Schema avroSchema) { + switch (types.getCategory()) { + case BINARY: + return new BinaryConverter(); + case BOOLEAN: + return new BooleanConverter(); + case BYTE: + case SHORT: + case INT: + return new IntConverter(); + case LONG: + return new LongConverter(); + case FLOAT: + return new FloatConverter(); + case DOUBLE: + return new DoubleConverter(); + case CHAR: + case VARCHAR: + case STRING: + return new StringConverter(); + case TIMESTAMP: + return new TimestampConverter(); + case DECIMAL: + return new DecimalConverter(types.getScale()); + case LIST: + return new ListConverter(types, avroSchema); + case STRUCT: + return new StructConverter(types, avroSchema); + default: + throw new IllegalArgumentException("Unhandled type " + types); + } + } + + /** + * Remove the union(null, ...) wrapper around the schema. + * + * All of the types in Hive are nullable and in Avro those are represented + * by wrapping each type in a union type with the void type. + * @param avro The avro type + * @return The avro type with the nullable layer removed + */ + static Schema removeNullable(Schema avro) { + while (avro.getType() == Schema.Type.UNION) { + List children = avro.getTypes(); + if (children.size() == 2 && + children.get(0).getType() == Schema.Type.NULL) { + avro = children.get(1); + } else { + break; + } + } + return avro; + } + + private final AvroConverter[] converters; + private final DataFileWriter writer; + private final GenericRecord record; + + public AvroWriter(Path path, TypeDescription schema, + Configuration conf, + CompressionKind compression) throws IOException { + List childTypes = schema.getChildren(); + Schema avroSchema = AvroSchemaUtils.createAvroSchema(schema); + List avroFields = avroSchema.getFields(); + converters = new AvroConverter[childTypes.size()]; + for(int c=0; c < converters.length; ++c) { + converters[c] = createConverter(childTypes.get(c), + removeNullable(avroFields.get(c).schema())); + } + GenericDatumWriter gdw = new GenericDatumWriter(avroSchema); + writer = new DataFileWriter(gdw); + switch (compression) { + case NONE: + break; + case ZLIB: + writer.setCodec(CodecFactory.deflateCodec(-1)); + break; + case SNAPPY: + writer.setCodec(CodecFactory.snappyCodec()); + break; + default: + throw new IllegalArgumentException("Compression unsupported " + compression); + } + writer.create(avroSchema, path.getFileSystem(conf).create(path)); + record = new GenericData.Record(avroSchema); + } + + public void writeBatch(VectorizedRowBatch batch) throws IOException { + for(int r=0; r < batch.size; ++r) { + for(int f=0; f < batch.cols.length; ++f) { + record.put(f, converters[f].convert(batch.cols[f], r)); + } + writer.append(record); + } + } + + public void close() throws IOException { + writer.close(); + } + + static Buffer getBufferFromBytes(byte[] input) { + ByteBuffer bb = ByteBuffer.wrap(input); + return bb.rewind(); + } + + public static Buffer getBufferFromDecimal(HiveDecimal dec, int scale) { + if (dec == null) { + return null; + } + + dec = dec.setScale(scale); + return getBufferFromBytes(dec.unscaledValue().toByteArray()); + } +} diff --git a/java/bench/src/java/org/apache/orc/bench/convert/csv/CsvReader.java b/java/bench/src/java/org/apache/orc/bench/convert/csv/CsvReader.java new file mode 100644 index 0000000000..3246e69c1e --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/convert/csv/CsvReader.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.bench.convert.csv; + +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.TypeDescription; +import org.apache.orc.bench.CompressionKind; +import org.apache.orc.bench.convert.BatchReader; + +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.sql.Timestamp; +import java.util.Iterator; +import java.util.List; +import java.util.zip.GZIPInputStream; + +public class CsvReader implements BatchReader { + private final Iterator parser; + private final ColumnReader[] readers; + + interface ColumnReader { + void read(String value, ColumnVector vect, int row); + } + + static class LongColumnReader implements ColumnReader { + public void read(String value, ColumnVector vect, int row) { + if ("".equals(value)) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + LongColumnVector vector = (LongColumnVector) vect; + vector.vector[row] = Long.parseLong(value); + } + } + } + + static class DoubleColumnReader implements ColumnReader { + public void read(String value, ColumnVector vect, int row) { + if ("".equals(value)) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + DoubleColumnVector vector = (DoubleColumnVector) vect; + vector.vector[row] = Double.parseDouble(value); + } + } + } + + static class StringColumnReader implements ColumnReader { + public void read(String value, ColumnVector vect, int row) { + if ("".equals(value)) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + BytesColumnVector vector = (BytesColumnVector) vect; + byte[] bytes = value.getBytes(StandardCharsets.UTF_8); + vector.setRef(row, bytes, 0, bytes.length); + } + } + } + + static class TimestampColumnReader implements ColumnReader { + public void read(String value, ColumnVector vect, int row) { + if ("".equals(value)) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + TimestampColumnVector vector = (TimestampColumnVector) vect; + vector.set(row, Timestamp.valueOf(value)); + } + } + } + + static class DecimalColumnReader implements ColumnReader { + public void read(String value, ColumnVector vect, int row) { + if ("".equals(value)) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + DecimalColumnVector vector = (DecimalColumnVector) vect; + vector.vector[row].set(HiveDecimal.create(value)); + } + } + } + + ColumnReader createReader(TypeDescription schema) { + switch (schema.getCategory()) { + case BYTE: + case SHORT: + case INT: + case LONG: + return new LongColumnReader(); + case FLOAT: + case DOUBLE: + return new DoubleColumnReader(); + case CHAR: + case VARCHAR: + case STRING: + return new StringColumnReader(); + case DECIMAL: + return new DecimalColumnReader(); + case TIMESTAMP: + return new TimestampColumnReader(); + default: + throw new IllegalArgumentException("Unhandled type " + schema); + } + } + + public CsvReader(Path path, + TypeDescription schema, + Configuration conf, + CompressionKind compress) throws IOException { + FileSystem fs = path.getFileSystem(conf); + InputStream input = compress.read(fs.open(path)); + parser = new CSVParser(new InputStreamReader(input, StandardCharsets.UTF_8), + CSVFormat.RFC4180.withHeader()).iterator(); + List columnTypes = schema.getChildren(); + readers = new ColumnReader[columnTypes.size()]; + int c = 0; + for(TypeDescription columnType: columnTypes) { + readers[c++] = createReader(columnType); + } + } + + public boolean nextBatch(VectorizedRowBatch batch) throws IOException { + batch.reset(); + int maxSize = batch.getMaxSize(); + while (parser.hasNext() && batch.size < maxSize) { + CSVRecord record = parser.next(); + int c = 0; + for(String val: record) { + readers[c].read(val, batch.cols[c], batch.size); + c += 1; + } + batch.size++; + } + return batch.size != 0; + } + + public void close() { + // PASS + } +} diff --git a/java/bench/src/java/org/apache/orc/bench/convert/json/JsonReader.java b/java/bench/src/java/org/apache/orc/bench/convert/json/JsonReader.java new file mode 100644 index 0000000000..b4ff3122bb --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/convert/json/JsonReader.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.bench.convert.json; + +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonStreamParser; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.TypeDescription; +import org.apache.orc.bench.CompressionKind; +import org.apache.orc.bench.convert.BatchReader; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.sql.Timestamp; +import java.util.List; +import java.util.zip.GZIPInputStream; + +public class JsonReader implements BatchReader { + private final TypeDescription schema; + private final JsonStreamParser parser; + private final JsonConverter[] converters; + + public JsonReader(Path path, + TypeDescription schema, + Configuration conf, + CompressionKind compressionKind) throws IOException { + this.schema = schema; + FileSystem fs = path.getFileSystem(conf); + InputStream input = compressionKind.read(fs.open(path)); + parser = new JsonStreamParser(new InputStreamReader(input, + StandardCharsets.UTF_8)); + if (schema.getCategory() != TypeDescription.Category.STRUCT) { + throw new IllegalArgumentException("Root must be struct - " + schema); + } + List fieldTypes = schema.getChildren(); + converters = new JsonConverter[fieldTypes.size()]; + for(int c = 0; c < converters.length; ++c) { + converters[c] = createConverter(fieldTypes.get(c)); + } + } + + public boolean nextBatch(VectorizedRowBatch batch) throws IOException { + batch.reset(); + int maxSize = batch.getMaxSize(); + List fieldNames = schema.getFieldNames(); + while (parser.hasNext() && batch.size < maxSize) { + JsonObject elem = parser.next().getAsJsonObject(); + for(int c=0; c < converters.length; ++c) { + // look up each field to see if it is in the input, otherwise + // set it to null. + JsonElement field = elem.get(fieldNames.get(c)); + if (field == null) { + batch.cols[c].noNulls = false; + batch.cols[c].isNull[batch.size] = true; + } else { + converters[c].convert(field, batch.cols[c], batch.size); + } + } + batch.size++; + } + return batch.size != 0; + } + + public void close() { + // PASS + } + + interface JsonConverter { + void convert(JsonElement value, ColumnVector vect, int row); + } + + static class BooleanColumnConverter implements JsonConverter { + public void convert(JsonElement value, ColumnVector vect, int row) { + if (value == null || value.isJsonNull()) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + LongColumnVector vector = (LongColumnVector) vect; + vector.vector[row] = value.getAsBoolean() ? 1 : 0; + } + } + } + + static class LongColumnConverter implements JsonConverter { + public void convert(JsonElement value, ColumnVector vect, int row) { + if (value == null || value.isJsonNull()) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + LongColumnVector vector = (LongColumnVector) vect; + vector.vector[row] = value.getAsLong(); + } + } + } + + static class DoubleColumnConverter implements JsonConverter { + public void convert(JsonElement value, ColumnVector vect, int row) { + if (value == null || value.isJsonNull()) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + DoubleColumnVector vector = (DoubleColumnVector) vect; + vector.vector[row] = value.getAsDouble(); + } + } + } + + static class StringColumnConverter implements JsonConverter { + public void convert(JsonElement value, ColumnVector vect, int row) { + if (value == null || value.isJsonNull()) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + BytesColumnVector vector = (BytesColumnVector) vect; + byte[] bytes = value.getAsString().getBytes(StandardCharsets.UTF_8); + vector.setRef(row, bytes, 0, bytes.length); + } + } + } + + static class BinaryColumnConverter implements JsonConverter { + public void convert(JsonElement value, ColumnVector vect, int row) { + if (value == null || value.isJsonNull()) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + BytesColumnVector vector = (BytesColumnVector) vect; + String binStr = value.getAsString(); + byte[] bytes = new byte[binStr.length()/2]; + for(int i=0; i < bytes.length; ++i) { + bytes[i] = (byte) Integer.parseInt(binStr.substring(i*2, i*2+2), 16); + } + vector.setRef(row, bytes, 0, bytes.length); + } + } + } + + static class TimestampColumnConverter implements JsonConverter { + public void convert(JsonElement value, ColumnVector vect, int row) { + if (value == null || value.isJsonNull()) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + TimestampColumnVector vector = (TimestampColumnVector) vect; + vector.set(row, Timestamp.valueOf(value.getAsString() + .replaceAll("[TZ]", " "))); + } + } + } + + static class DecimalColumnConverter implements JsonConverter { + public void convert(JsonElement value, ColumnVector vect, int row) { + if (value == null || value.isJsonNull()) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + DecimalColumnVector vector = (DecimalColumnVector) vect; + vector.vector[row].set(HiveDecimal.create(value.getAsString())); + } + } + } + + static class StructColumnConverter implements JsonConverter { + private JsonConverter[] childrenConverters; + private List fieldNames; + + public StructColumnConverter(TypeDescription schema) { + List kids = schema.getChildren(); + childrenConverters = new JsonConverter[kids.size()]; + for(int c=0; c < childrenConverters.length; ++c) { + childrenConverters[c] = createConverter(kids.get(c)); + } + fieldNames = schema.getFieldNames(); + } + + public void convert(JsonElement value, ColumnVector vect, int row) { + if (value == null || value.isJsonNull()) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + StructColumnVector vector = (StructColumnVector) vect; + JsonObject obj = value.getAsJsonObject(); + for(int c=0; c < childrenConverters.length; ++c) { + JsonElement elem = obj.get(fieldNames.get(c)); + childrenConverters[c].convert(elem, vector.fields[c], row); + } + } + } + } + + static class ListColumnConverter implements JsonConverter { + private JsonConverter childrenConverter; + + public ListColumnConverter(TypeDescription schema) { + childrenConverter = createConverter(schema.getChildren().get(0)); + } + + public void convert(JsonElement value, ColumnVector vect, int row) { + if (value == null || value.isJsonNull()) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + ListColumnVector vector = (ListColumnVector) vect; + JsonArray obj = value.getAsJsonArray(); + vector.lengths[row] = obj.size(); + vector.offsets[row] = vector.childCount; + vector.childCount += vector.lengths[row]; + vector.child.ensureSize(vector.childCount, true); + for(int c=0; c < obj.size(); ++c) { + childrenConverter.convert(obj.get(c), vector.child, + (int) vector.offsets[row] + c); + } + } + } + } + + static JsonConverter createConverter(TypeDescription schema) { + switch (schema.getCategory()) { + case BYTE: + case SHORT: + case INT: + case LONG: + return new LongColumnConverter(); + case FLOAT: + case DOUBLE: + return new DoubleColumnConverter(); + case CHAR: + case VARCHAR: + case STRING: + return new StringColumnConverter(); + case DECIMAL: + return new DecimalColumnConverter(); + case TIMESTAMP: + return new TimestampColumnConverter(); + case BINARY: + return new BinaryColumnConverter(); + case BOOLEAN: + return new BooleanColumnConverter(); + case STRUCT: + return new StructColumnConverter(schema); + case LIST: + return new ListColumnConverter(schema); + default: + throw new IllegalArgumentException("Unhandled type " + schema); + } + } +} diff --git a/java/bench/src/java/org/apache/orc/bench/convert/json/JsonWriter.java b/java/bench/src/java/org/apache/orc/bench/convert/json/JsonWriter.java new file mode 100644 index 0000000000..bd411154e4 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/convert/json/JsonWriter.java @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench.convert.json; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.orc.TypeDescription; +import org.apache.orc.bench.convert.BatchWriter; +import org.apache.orc.bench.CompressionKind; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.nio.charset.StandardCharsets; +import java.util.List; + +public class JsonWriter implements BatchWriter { + private final Writer outStream; + private final JsonGenerator writer; + private final TypeDescription schema; + + public JsonWriter(Path path, TypeDescription schema, + Configuration conf, + CompressionKind compression) throws IOException { + OutputStream file = path.getFileSystem(conf).create(path, true); + outStream = new OutputStreamWriter(compression.create(file), + StandardCharsets.UTF_8); + JsonFactory factory = new JsonFactory(); + factory.setRootValueSeparator("\n"); + writer = factory.createGenerator(outStream); + this.schema = schema; + } + + private static void printMap(JsonGenerator writer, + MapColumnVector vector, + TypeDescription schema, + int row) throws IOException { + writer.writeStartArray(); + TypeDescription keyType = schema.getChildren().get(0); + TypeDescription valueType = schema.getChildren().get(1); + int offset = (int) vector.offsets[row]; + for (int i = 0; i < vector.lengths[row]; ++i) { + writer.writeStartObject(); + writer.writeFieldName("_key"); + printValue(writer, vector.keys, keyType, offset + i); + writer.writeFieldName("_value"); + printValue(writer, vector.values, valueType, offset + i); + writer.writeEndObject(); + } + writer.writeEndArray(); + } + + private static void printList(JsonGenerator writer, + ListColumnVector vector, + TypeDescription schema, + int row) throws IOException { + writer.writeStartArray(); + int offset = (int) vector.offsets[row]; + TypeDescription childType = schema.getChildren().get(0); + for (int i = 0; i < vector.lengths[row]; ++i) { + printValue(writer, vector.child, childType, offset + i); + } + writer.writeEndArray(); + } + + private static void printUnion(JsonGenerator writer, + UnionColumnVector vector, + TypeDescription schema, + int row) throws IOException { + int tag = vector.tags[row]; + printValue(writer, vector.fields[tag], schema.getChildren().get(tag), row); + } + + static void printStruct(JsonGenerator writer, + StructColumnVector batch, + TypeDescription schema, + int row) throws IOException { + writer.writeStartObject(); + List fieldNames = schema.getFieldNames(); + List fieldTypes = schema.getChildren(); + for (int i = 0; i < fieldTypes.size(); ++i) { + writer.writeFieldName(fieldNames.get(i)); + printValue(writer, batch.fields[i], fieldTypes.get(i), row); + } + writer.writeEndObject(); + } + + static void printBinary(JsonGenerator writer, BytesColumnVector vector, + int row) throws IOException { + StringBuilder buffer = new StringBuilder(); + int offset = vector.start[row]; + for(int i=0; i < vector.length[row]; ++i) { + int value = 0xff & (int) vector.vector[row][offset + i]; + buffer.append(String.format("%02x", value)); + } + writer.writeString(buffer.toString()); + } + + static void printValue(JsonGenerator writer, ColumnVector vector, + TypeDescription schema, int row) throws IOException { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + switch (schema.getCategory()) { + case BOOLEAN: + writer.writeBoolean(((LongColumnVector) vector).vector[row] != 0); + break; + case BYTE: + case SHORT: + case INT: + case LONG: + writer.writeNumber(((LongColumnVector) vector).vector[row]); + break; + case FLOAT: + case DOUBLE: + writer.writeNumber(((DoubleColumnVector) vector).vector[row]); + break; + case STRING: + case CHAR: + case VARCHAR: + writer.writeString(((BytesColumnVector) vector).toString(row)); + break; + case BINARY: + printBinary(writer, (BytesColumnVector) vector, row); + break; + case DECIMAL: + writer.writeString(((DecimalColumnVector) vector).vector[row].toString()); + break; + case DATE: + writer.writeString(new DateWritable( + (int) ((LongColumnVector) vector).vector[row]).toString()); + break; + case TIMESTAMP: + writer.writeString(((TimestampColumnVector) vector) + .asScratchTimestamp(row).toString()); + break; + case LIST: + printList(writer, (ListColumnVector) vector, schema, row); + break; + case MAP: + printMap(writer, (MapColumnVector) vector, schema, row); + break; + case STRUCT: + printStruct(writer, (StructColumnVector) vector, schema, row); + break; + case UNION: + printUnion(writer, (UnionColumnVector) vector, schema, row); + break; + default: + throw new IllegalArgumentException("Unknown type " + + schema.toString()); + } + } else { + writer.writeNull(); + } + } + + static void printRow(JsonGenerator writer, + VectorizedRowBatch batch, + TypeDescription schema, + int row) throws IOException { + if (schema.getCategory() == TypeDescription.Category.STRUCT) { + List fieldTypes = schema.getChildren(); + List fieldNames = schema.getFieldNames(); + writer.writeStartObject(); + for (int c = 0; c < batch.cols.length; ++c) { + writer.writeFieldName(fieldNames.get(c)); + printValue(writer, batch.cols[c], fieldTypes.get(c), row); + } + writer.writeEndObject(); + } else { + printValue(writer, batch.cols[0], schema, row); + } + } + + public void writeBatch(VectorizedRowBatch batch) throws IOException { + for (int r = 0; r < batch.size; ++r) { + printRow(writer, batch, schema, r); + } + } + + public void close() throws IOException { + writer.close(); + } +} diff --git a/java/bench/src/java/org/apache/orc/bench/convert/orc/OrcReader.java b/java/bench/src/java/org/apache/orc/bench/convert/orc/OrcReader.java new file mode 100644 index 0000000000..e648856577 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/convert/orc/OrcReader.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench.convert.orc; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.RecordReader; +import org.apache.orc.TypeDescription; +import org.apache.orc.bench.convert.BatchReader; + +import java.io.IOException; + +public class OrcReader implements BatchReader { + private final RecordReader reader; + + public OrcReader(Path path, + TypeDescription schema, + Configuration conf + ) throws IOException { + Reader file = OrcFile.createReader(path, OrcFile.readerOptions(conf)); + reader = file.rows(file.options().schema(schema)); + } + + public boolean nextBatch(VectorizedRowBatch batch) throws IOException { + return reader.nextBatch(batch); + } + + public void close() throws IOException { + reader.close(); + } +} diff --git a/java/bench/src/java/org/apache/orc/bench/convert/orc/OrcWriter.java b/java/bench/src/java/org/apache/orc/bench/convert/orc/OrcWriter.java new file mode 100644 index 0000000000..af5de9b8c0 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/convert/orc/OrcWriter.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench.convert.orc; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; +import org.apache.orc.Writer; +import org.apache.orc.bench.convert.BatchWriter; +import org.apache.orc.bench.CompressionKind; +import org.apache.orc.bench.Utilities; + +import java.io.IOException; + +public class OrcWriter implements BatchWriter { + private final Writer writer; + + public OrcWriter(Path path, + TypeDescription schema, + Configuration conf, + CompressionKind compression + ) throws IOException { + writer = OrcFile.createWriter(path, + OrcFile.writerOptions(conf) + .setSchema(schema) + .compress(Utilities.getCodec(compression))); + } + + public void writeBatch(VectorizedRowBatch batch) throws IOException { + writer.addRowBatch(batch); + } + + public void close() throws IOException { + writer.close(); + } +} diff --git a/java/bench/src/java/org/apache/orc/bench/convert/parquet/ParquetReader.java b/java/bench/src/java/org/apache/orc/bench/convert/parquet/ParquetReader.java new file mode 100644 index 0000000000..83f70f45e5 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/convert/parquet/ParquetReader.java @@ -0,0 +1,297 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench.convert.parquet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.hive.serde2.io.TimestampWritable; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.orc.TypeDescription; +import org.apache.orc.bench.convert.BatchReader; + +import java.io.IOException; +import java.util.List; + +public class ParquetReader implements BatchReader { + + private final NullWritable nada = NullWritable.get(); + private final RecordReader reader; + private final ArrayWritable value; + private final Converter[] converters; + + public ParquetReader(Path path, + TypeDescription schema, + Configuration conf) throws IOException { + FileSplit split = new FileSplit(path, 0, Long.MAX_VALUE, new String[]{}); + JobConf jobConf = new JobConf(conf); + reader = new MapredParquetInputFormat().getRecordReader(split, jobConf, + Reporter.NULL); + value = reader.createValue(); + converters = new Converter[schema.getChildren().size()]; + List children = schema.getChildren(); + for(int c = 0; c < converters.length; ++c) { + converters[c] = createConverter(children.get(c)); + } + } + + @Override + public boolean nextBatch(VectorizedRowBatch batch) throws IOException { + batch.reset(); + int maxSize = batch.getMaxSize(); + while (batch.size < maxSize && reader.next(nada, value)) { + Writable[] values = value.get(); + int row = batch.size++; + for(int c=0; c < batch.cols.length; ++c) { + converters[c].convert(batch.cols[c], row, values[c]); + } + } + return batch.size != 0; + } + + @Override + public void close() throws IOException { + reader.close(); + } + + interface Converter { + void convert(ColumnVector vector, int row, Object value); + } + + private static class BooleanConverter implements Converter { + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + ((LongColumnVector) cv).vector[row] = + ((BooleanWritable) value).get() ? 1 : 0; + } + } + } + + private static class IntConverter implements Converter { + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + ((LongColumnVector) cv).vector[row] = + ((IntWritable) value).get(); + } + } + } + + private static class LongConverter implements Converter { + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + ((LongColumnVector) cv).vector[row] = + ((LongWritable) value).get(); + } + } + } + + private static class FloatConverter implements Converter { + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + ((DoubleColumnVector) cv).vector[row] = + ((FloatWritable) value).get(); + } + } + } + + private static class DoubleConverter implements Converter { + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + ((DoubleColumnVector) cv).vector[row] = + ((DoubleWritable) value).get(); + } + } + } + + private static class StringConverter implements Converter { + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + Text castValue = (Text) value; + ((BytesColumnVector) cv).setVal(row, castValue.getBytes(), 0, + castValue.getLength()); + } + } + } + + private static class BinaryConverter implements Converter { + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + BytesWritable buf = (BytesWritable) value; + ((BytesColumnVector) cv).setVal(row, buf.getBytes(), 0, + buf.getLength()); + } + } + } + + private static class TimestampConverter implements Converter { + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + TimestampColumnVector tc = (TimestampColumnVector) cv; + tc.time[row] = ((TimestampWritable) value).getSeconds(); + tc.nanos[row] = ((TimestampWritable) value).getNanos(); + } + } + } + + private static class DecimalConverter implements Converter { + final int scale; + DecimalConverter(int scale) { + this.scale = scale; + } + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + DecimalColumnVector tc = (DecimalColumnVector) cv; + tc.vector[row].set((HiveDecimalWritable) value); + } + } + } + + private static class ListConverter implements Converter { + final Converter childConverter; + + ListConverter(TypeDescription schema) { + childConverter = createConverter(schema.getChildren().get(0)); + } + + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + ListColumnVector tc = (ListColumnVector) cv; + Writable[] array = ((ArrayWritable) value).get(); + int start = tc.childCount; + int len = array.length; + tc.childCount += len; + tc.child.ensureSize(tc.childCount, true); + for(int i=0; i < len; ++i) { + childConverter.convert(tc.child, start + i, array[i]); + } + } + } + } + + private static class StructConverter implements Converter { + final Converter[] childConverters; + + StructConverter(TypeDescription schema) { + List children = schema.getChildren(); + childConverters = new Converter[children.size()]; + for(int i=0; i < childConverters.length; ++i) { + childConverters[i] = createConverter(children.get(i)); + } + } + + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + StructColumnVector tc = (StructColumnVector) cv; + Writable[] record = ((ArrayWritable) value).get(); + for(int c=0; c < tc.fields.length; ++c) { + childConverters[c].convert(tc.fields[c], row, record[c]); + } + } + } + } + + static Converter createConverter(TypeDescription types) { + switch (types.getCategory()) { + case BINARY: + return new BinaryConverter(); + case BOOLEAN: + return new BooleanConverter(); + case BYTE: + case SHORT: + case INT: + return new IntConverter(); + case LONG: + return new LongConverter(); + case FLOAT: + return new FloatConverter(); + case DOUBLE: + return new DoubleConverter(); + case CHAR: + case VARCHAR: + case STRING: + return new StringConverter(); + case TIMESTAMP: + return new TimestampConverter(); + case DECIMAL: + return new DecimalConverter(types.getScale()); + case LIST: + return new ListConverter(types); + case STRUCT: + return new StructConverter(types); + default: + throw new IllegalArgumentException("Unhandled type " + types); + } + } +} diff --git a/java/bench/src/java/org/apache/orc/bench/convert/parquet/ParquetWriter.java b/java/bench/src/java/org/apache/orc/bench/convert/parquet/ParquetWriter.java new file mode 100644 index 0000000000..075060e042 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/convert/parquet/ParquetWriter.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench.convert.parquet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.orc.OrcBenchmarkUtilities; +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat; +import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.orc.TypeDescription; +import org.apache.orc.bench.convert.BatchWriter; +import org.apache.orc.bench.CompressionKind; +import org.apache.orc.bench.Utilities; +import org.apache.parquet.hadoop.ParquetOutputFormat; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; + +import java.io.IOException; +import java.util.Properties; + +public class ParquetWriter implements BatchWriter { + private final FileSinkOperator.RecordWriter writer; + private final TypeDescription schema; + private final ParquetHiveRecord record; + + public ParquetWriter(Path path, + TypeDescription schema, + Configuration conf, + CompressionKind compression + ) throws IOException { + JobConf jobConf = new JobConf(conf); + Properties tableProperties = Utilities.convertSchemaToHiveConfig(schema); + this.schema = schema; + jobConf.set(ParquetOutputFormat.COMPRESSION, getCodec(compression).name()); + writer = new MapredParquetOutputFormat().getHiveRecordWriter(jobConf, path, + ParquetHiveRecord.class, compression != CompressionKind.NONE, + tableProperties, Reporter.NULL); + record = new ParquetHiveRecord(null, + OrcBenchmarkUtilities.createObjectInspector(schema)); + } + + public void writeBatch(VectorizedRowBatch batch) throws IOException { + for(int r=0; r < batch.size; ++r) { + record.value = OrcBenchmarkUtilities.nextObject(batch, schema, r, + (Writable) record.value); + writer.write(record); + } + } + + public void close() throws IOException { + writer.close(false); + } + + public static CompressionCodecName getCodec(CompressionKind kind) { + switch (kind) { + case NONE: + return CompressionCodecName.UNCOMPRESSED; + case ZLIB: + return CompressionCodecName.GZIP; + case SNAPPY: + return CompressionCodecName.SNAPPY; + default: + throw new IllegalArgumentException("Unsupported codec " + kind); + } + } +} diff --git a/java/bench/src/main/resources/github.schema b/java/bench/src/main/resources/github.schema new file mode 100644 index 0000000000..3b7dd15fcf --- /dev/null +++ b/java/bench/src/main/resources/github.schema @@ -0,0 +1,702 @@ +struct< + actor:struct < + avatar_url: string, + gravatar_id: string, + id: int, + login: string, + url: string>, + created_at:timestamp, + id:binary, + org:struct < + avatar_url: string, + gravatar_id: string, + id: int, + login: string, + url: string>, + payload:struct < + action: string, + before: binary, + comment: struct < + _links: struct < + html: struct < + href: string>, + pull_request: struct < + href: string>, + self: struct < + href: string>>, + body: string, + commit_id: binary, + created_at: timestamp, + diff_hunk: string, + html_url: string, + id: int, + issue_url: string, + line: int, + original_commit_id: binary, + original_position: int, + path: string, + position: int, + pull_request_url: string, + updated_at: timestamp, + url: string, + user: struct < + avatar_url: string, + events_url: string, + followers_url: string, + following_url: string, + gists_url: string, + gravatar_id: string, + html_url: string, + id: int, + login: string, + organizations_url: string, + received_events_url: string, + repos_url: string, + site_admin: boolean, + starred_url: string, + subscriptions_url: string, + type: string, + url: string>>, + commits: array , + distinct: boolean, + message: string, + sha: binary, + url: string>>, + description: string, + distinct_size: int, + forkee: struct < + archive_url: string, + assignees_url: string, + blobs_url: string, + branches_url: string, + clone_url: string, + collaborators_url: string, + comments_url: string, + commits_url: string, + compare_url: string, + contents_url: string, + contributors_url: string, + created_at: timestamp, + default_branch: string, + description: string, + downloads_url: string, + events_url: string, + fork: boolean, + forks: int, + forks_count: int, + forks_url: string, + full_name: string, + git_commits_url: string, + git_refs_url: string, + git_tags_url: string, + git_url: string, + has_downloads: boolean, + has_issues: boolean, + has_pages: boolean, + has_wiki: boolean, + homepage: string, + hooks_url: string, + html_url: string, + id: int, + issue_comment_url: string, + issue_events_url: string, + issues_url: string, + keys_url: string, + labels_url: string, + language: string, + languages_url: string, + merges_url: string, + milestones_url: string, + mirror_url: string, + name: string, + notifications_url: string, + open_issues: int, + open_issues_count: int, + owner: struct < + avatar_url: string, + events_url: string, + followers_url: string, + following_url: string, + gists_url: string, + gravatar_id: string, + html_url: string, + id: int, + login: string, + organizations_url: string, + received_events_url: string, + repos_url: string, + site_admin: boolean, + starred_url: string, + subscriptions_url: string, + type: string, + url: string>, + private: boolean, + public: boolean, + pulls_url: string, + pushed_at: timestamp, + releases_url: string, + size: int, + ssh_url: string, + stargazers_count: int, + stargazers_url: string, + statuses_url: string, + subscribers_url: string, + subscription_url: string, + svn_url: string, + tags_url: string, + teams_url: string, + trees_url: string, + updated_at: timestamp, + url: string, + watchers: int, + watchers_count: int>, + head: binary, + issue: struct < + assignee: struct < + avatar_url: string, + events_url: string, + followers_url: string, + following_url: string, + gists_url: string, + gravatar_id: string, + html_url: string, + id: int, + login: string, + organizations_url: string, + received_events_url: string, + repos_url: string, + site_admin: boolean, + starred_url: string, + subscriptions_url: string, + type: string, + url: string>, + body: string, + closed_at: timestamp, + comments: int, + comments_url: string, + created_at: timestamp, + events_url: string, + html_url: string, + id: int, + labels: array >, + labels_url: string, + locked: boolean, + milestone: struct < + closed_at: timestamp, + closed_issues: int, + created_at: timestamp, + creator: struct < + avatar_url: string, + events_url: string, + followers_url: string, + following_url: string, + gists_url: string, + gravatar_id: string, + html_url: string, + id: int, + login: string, + organizations_url: string, + received_events_url: string, + repos_url: string, + site_admin: boolean, + starred_url: string, + subscriptions_url: string, + type: string, + url: string>, + description: string, + due_on: timestamp, + html_url: string, + id: int, + labels_url: string, + number: int, + open_issues: int, + state: string, + title: string, + updated_at: timestamp, + url: string>, + number: int, + pull_request: struct < + diff_url: string, + html_url: string, + patch_url: string, + url: string>, + state: string, + title: string, + updated_at: timestamp, + url: string, + user: struct < + avatar_url: string, + events_url: string, + followers_url: string, + following_url: string, + gists_url: string, + gravatar_id: string, + html_url: string, + id: int, + login: string, + organizations_url: string, + received_events_url: string, + repos_url: string, + site_admin: boolean, + starred_url: string, + subscriptions_url: string, + type: string, + url: string>>, + master_branch: string, + member: struct < + avatar_url: string, + events_url: string, + followers_url: string, + following_url: string, + gists_url: string, + gravatar_id: string, + html_url: string, + id: int, + login: string, + organizations_url: string, + received_events_url: string, + repos_url: string, + site_admin: boolean, + starred_url: string, + subscriptions_url: string, + type: string, + url: string>, + number: int, + pages: array >, + pull_request: struct < + _links: struct < + comments: struct < + href: string>, + commits: struct < + href: string>, + html: struct < + href: string>, + issue: struct < + href: string>, + review_comment: struct < + href: string>, + review_comments: struct < + href: string>, + self: struct < + href: string>, + statuses: struct < + href: string>>, + additions: int, + assignee: struct < + avatar_url: string, + events_url: string, + followers_url: string, + following_url: string, + gists_url: string, + gravatar_id: string, + html_url: string, + id: int, + login: string, + organizations_url: string, + received_events_url: string, + repos_url: string, + site_admin: boolean, + starred_url: string, + subscriptions_url: string, + type: string, + url: string>, + base: struct < + label: string, + ref: string, + repo: struct < + archive_url: string, + assignees_url: string, + blobs_url: string, + branches_url: string, + clone_url: string, + collaborators_url: string, + comments_url: string, + commits_url: string, + compare_url: string, + contents_url: string, + contributors_url: string, + created_at: timestamp, + default_branch: string, + description: string, + downloads_url: string, + events_url: string, + fork: boolean, + forks: int, + forks_count: int, + forks_url: string, + full_name: string, + git_commits_url: string, + git_refs_url: string, + git_tags_url: string, + git_url: string, + has_downloads: boolean, + has_issues: boolean, + has_pages: boolean, + has_wiki: boolean, + homepage: string, + hooks_url: string, + html_url: string, + id: int, + issue_comment_url: string, + issue_events_url: string, + issues_url: string, + keys_url: string, + labels_url: string, + language: string, + languages_url: string, + merges_url: string, + milestones_url: string, + mirror_url: string, + name: string, + notifications_url: string, + open_issues: int, + open_issues_count: int, + owner: struct < + avatar_url: string, + events_url: string, + followers_url: string, + following_url: string, + gists_url: string, + gravatar_id: string, + html_url: string, + id: int, + login: string, + organizations_url: string, + received_events_url: string, + repos_url: string, + site_admin: boolean, + starred_url: string, + subscriptions_url: string, + type: string, + url: string>, + private: boolean, + pulls_url: string, + pushed_at: timestamp, + releases_url: string, + size: int, + ssh_url: string, + stargazers_count: int, + stargazers_url: string, + statuses_url: string, + subscribers_url: string, + subscription_url: string, + svn_url: string, + tags_url: string, + teams_url: string, + trees_url: string, + updated_at: timestamp, + url: string, + watchers: int, + watchers_count: int>, + sha: binary, + user: struct < + avatar_url: string, + events_url: string, + followers_url: string, + following_url: string, + gists_url: string, + gravatar_id: string, + html_url: string, + id: int, + login: string, + organizations_url: string, + received_events_url: string, + repos_url: string, + site_admin: boolean, + starred_url: string, + subscriptions_url: string, + type: string, + url: string>>, + body: string, + changed_files: int, + closed_at: timestamp, + comments: int, + comments_url: string, + commits: int, + commits_url: string, + created_at: timestamp, + deletions: int, + diff_url: string, + head: struct < + label: string, + ref: string, + repo: struct < + archive_url: string, + assignees_url: string, + blobs_url: string, + branches_url: string, + clone_url: string, + collaborators_url: string, + comments_url: string, + commits_url: string, + compare_url: string, + contents_url: string, + contributors_url: string, + created_at: timestamp, + default_branch: string, + description: string, + downloads_url: string, + events_url: string, + fork: boolean, + forks: int, + forks_count: int, + forks_url: string, + full_name: string, + git_commits_url: string, + git_refs_url: string, + git_tags_url: string, + git_url: string, + has_downloads: boolean, + has_issues: boolean, + has_pages: boolean, + has_wiki: boolean, + homepage: string, + hooks_url: string, + html_url: string, + id: int, + issue_comment_url: string, + issue_events_url: string, + issues_url: string, + keys_url: string, + labels_url: string, + language: string, + languages_url: string, + merges_url: string, + milestones_url: string, + mirror_url: string, + name: string, + notifications_url: string, + open_issues: int, + open_issues_count: int, + owner: struct < + avatar_url: string, + events_url: string, + followers_url: string, + following_url: string, + gists_url: string, + gravatar_id: string, + html_url: string, + id: int, + login: string, + organizations_url: string, + received_events_url: string, + repos_url: string, + site_admin: boolean, + starred_url: string, + subscriptions_url: string, + type: string, + url: string>, + private: boolean, + pulls_url: string, + pushed_at: timestamp, + releases_url: string, + size: int, + ssh_url: string, + stargazers_count: int, + stargazers_url: string, + statuses_url: string, + subscribers_url: string, + subscription_url: string, + svn_url: string, + tags_url: string, + teams_url: string, + trees_url: string, + updated_at: timestamp, + url: string, + watchers: int, + watchers_count: int>, + sha: binary, + user: struct < + avatar_url: string, + events_url: string, + followers_url: string, + following_url: string, + gists_url: string, + gravatar_id: string, + html_url: string, + id: int, + login: string, + organizations_url: string, + received_events_url: string, + repos_url: string, + site_admin: boolean, + starred_url: string, + subscriptions_url: string, + type: string, + url: string>>, + html_url: string, + id: int, + issue_url: string, + locked: boolean, + merge_commit_sha: string, + mergeable: boolean, + mergeable_state: string, + merged: boolean, + merged_at: timestamp, + merged_by: struct < + avatar_url: string, + events_url: string, + followers_url: string, + following_url: string, + gists_url: string, + gravatar_id: string, + html_url: string, + id: int, + login: string, + organizations_url: string, + received_events_url: string, + repos_url: string, + site_admin: boolean, + starred_url: string, + subscriptions_url: string, + type: string, + url: string>, + milestone: struct < + closed_at: timestamp, + closed_issues: int, + created_at: timestamp, + creator: struct < + avatar_url: string, + events_url: string, + followers_url: string, + following_url: string, + gists_url: string, + gravatar_id: string, + html_url: string, + id: int, + login: string, + organizations_url: string, + received_events_url: string, + repos_url: string, + site_admin: boolean, + starred_url: string, + subscriptions_url: string, + type: string, + url: string>, + description: string, + due_on: timestamp, + html_url: string, + id: int, + labels_url: string, + number: int, + open_issues: int, + state: string, + title: string, + updated_at: timestamp, + url: string>, + number: int, + patch_url: string, + review_comment_url: string, + review_comments: int, + review_comments_url: string, + state: string, + statuses_url: string, + title: string, + updated_at: timestamp, + url: string, + user: struct < + avatar_url: string, + events_url: string, + followers_url: string, + following_url: string, + gists_url: string, + gravatar_id: string, + html_url: string, + id: int, + login: string, + organizations_url: string, + received_events_url: string, + repos_url: string, + site_admin: boolean, + starred_url: string, + subscriptions_url: string, + type: string, + url: string>>, + push_id: int, + pusher_type: string, + ref: string, + ref_type: string, + release: struct < + assets: array , + url: string>>, + assets_url: string, + author: struct < + avatar_url: string, + events_url: string, + followers_url: string, + following_url: string, + gists_url: string, + gravatar_id: string, + html_url: string, + id: int, + login: string, + organizations_url: string, + received_events_url: string, + repos_url: string, + site_admin: boolean, + starred_url: string, + subscriptions_url: string, + type: string, + url: string>, + body: string, + created_at: timestamp, + draft: boolean, + html_url: string, + id: int, + name: string, + prerelease: boolean, + published_at: timestamp, + tag_name: string, + tarball_url: string, + target_commitish: string, + upload_url: string, + url: string, + zipball_url: string>, + size: int>, + public: boolean, + repo: struct < + id: int, + name: string, + url: string>, + type: string +> \ No newline at end of file diff --git a/java/bench/src/main/resources/log4j.properties b/java/bench/src/main/resources/log4j.properties new file mode 100644 index 0000000000..363917c3e5 --- /dev/null +++ b/java/bench/src/main/resources/log4j.properties @@ -0,0 +1,6 @@ +log4j.rootLogger=WARN, CONSOLE + +# CONSOLE is set to be a ConsoleAppender using a PatternLayout +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=[%-5p] %m%n \ No newline at end of file diff --git a/java/bench/src/main/resources/sales.schema b/java/bench/src/main/resources/sales.schema new file mode 100644 index 0000000000..df96409c2b --- /dev/null +++ b/java/bench/src/main/resources/sales.schema @@ -0,0 +1,56 @@ +struct< + sales_id:bigint, + customer_id:bigint, + col3:bigint, + item_category:bigint, + item_count:bigint, + change_ts:timestamp, + store_location:string, + associate_id:string, + col9:bigint, + rebate_id:string, + create_ts:timestamp, + col13:bigint, + size:string, + col14:bigint, + fulfilled:boolean, + global_id:string, + col17:string, + col18:string, + col19:bigint, + has_rebate:boolean, + col21:array< + struct< + sub1:bigint, + sub2:string, + sub3:string, + sub4:bigint, + sub5:bigint, + sub6:string>>, + vendor_id:string, + country:string, + backend_version:string, + col41:bigint, + col42:bigint, + col43:bigint, + col44:bigint, + col45:bigint, + col46:bigint, + col47:bigint, + col48:bigint, + col49:string, + col50:string, + col51:bigint, + col52:bigint, + col53:bigint, + col54:bigint, + col55:string, + col56:timestamp, + col57:timestamp, + md5:bigint, + col59:bigint, + col69:timestamp, + col61:string, + col62:string, + col63:timestamp, + col64:bigint> diff --git a/java/bench/src/main/resources/taxi.schema b/java/bench/src/main/resources/taxi.schema new file mode 100644 index 0000000000..5eb7c0fb8b --- /dev/null +++ b/java/bench/src/main/resources/taxi.schema @@ -0,0 +1,21 @@ +struct< + vendor_id:int, + pickup_time: timestamp, + dropoff_time: timestamp, + passenger_count: int, + trip_distance: double, + pickup_longitude: double, + pickup_latitude: double, + ratecode_id: int, + store_and_fwd_flag: string, + dropoff_longitude: double, + dropoff_latitude: double, + payment_type: int, + fare_amount: decimal(8,2), + extra: decimal(8,2), + mta_tax: decimal(8,2), + tip_amount: decimal(8,2), + tolls_amount: decimal(8,2), + improvement_surcharge : decimal(8,2), + total_amount: decimal(8,2) +> \ No newline at end of file diff --git a/java/core/src/java/org/apache/orc/TypeDescription.java b/java/core/src/java/org/apache/orc/TypeDescription.java index cced4144cd..b0857f0f48 100644 --- a/java/core/src/java/org/apache/orc/TypeDescription.java +++ b/java/core/src/java/org/apache/orc/TypeDescription.java @@ -749,7 +749,7 @@ private int assignIds(int startId) { return startId; } - private TypeDescription(Category category) { + public TypeDescription(Category category) { this.category = category; if (category.isPrimitive) { children = null; diff --git a/java/pom.xml b/java/pom.xml index 7bebd1f430..02af9384a3 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -60,6 +60,7 @@ core mapreduce tools + bench @@ -69,7 +70,9 @@ ${project.build.directory}/testing-tmp ${project.basedir}/../../examples + 1.8.1 2.6.4 + 2.1.1 2.2.1 3.4.6 @@ -270,6 +273,11 @@ guava 11.0.2 + + com.fasterxml.jackson.core + jackson-core + 2.8.4 + com.google.protobuf protobuf-java @@ -295,6 +303,22 @@ aircompressor 0.3 + + org.apache.avro + avro + ${avro.version} + + + org.apache.avro + avro-mapred + hadoop2 + ${avro.version} + + + org.apache.commons + commons-csv + 1.4 + org.apache.hadoop hadoop-common @@ -362,6 +386,52 @@ + + org.apache.hive + hive-common + ${hive.version} + + + org.apache.hive + hive-storage-api + + + org.apache.hive + hive-orc + + + + + org.apache.hive + hive-exec + ${hive.version} + core + + + org.apache.hive + hive-storage-api + + + org.apache.hive + hive-orc + + + + + org.apache.hive + hive-serde + ${hive.version} + + + org.apache.hive + hive-storage-api + + + org.apache.hive + hive-orc + + + org.apache.hadoop hadoop-hdfs @@ -461,6 +531,11 @@ + + org.apache.parquet + parquet-hadoop-bundle + 1.8.2 + org.codehaus.jettison jettison @@ -472,11 +547,27 @@ + + org.jodd + jodd-core + 3.5.2 + runtime + + + org.openjdk.jmh + jmh-core + 1.18 + org.slf4j slf4j-api 1.7.5 + + org.slf4j + slf4j-simple + 1.7.5 + diff --git a/java/tools/src/assembly/uber.xml b/java/tools/src/assembly/uber.xml index 2c45350ebf..014eab951b 100644 --- a/java/tools/src/assembly/uber.xml +++ b/java/tools/src/assembly/uber.xml @@ -19,15 +19,12 @@ false + / + true true runtime - - - ${project.build.outputDirectory} - - metaInf-services