diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderBase.java index 0af85c7014..76822367d2 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderBase.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderBase.java @@ -72,6 +72,14 @@ private static abstract class Binding { */ abstract void skip(); + /** + * Skips n values from the underlying page + * + * @param n + * the number of values to be skipped + */ + abstract void skip(int n); + /** * write current value to converter */ @@ -163,6 +171,10 @@ void read() { public void skip() { dataColumn.skip(); } + @Override + void skip(int n) { + dataColumn.skip(n); + } public int getDictionaryId() { return dictionaryId; } @@ -203,6 +215,11 @@ public void skip() { current = 0; dataColumn.skip(); } + @Override + void skip(int n) { + current = 0; + dataColumn.skip(n); + } public float getFloat() { return current; } @@ -222,6 +239,11 @@ public void skip() { current = 0; dataColumn.skip(); } + @Override + void skip(int n) { + current = 0; + dataColumn.skip(n); + } public double getDouble() { return current; } @@ -242,6 +264,11 @@ public void skip() { dataColumn.skip(); } @Override + void skip(int n) { + current = 0; + dataColumn.skip(n); + } + @Override public int getInteger() { return current; } @@ -262,6 +289,11 @@ public void skip() { dataColumn.skip(); } @Override + void skip(int n) { + current = 0; + dataColumn.skip(n); + } + @Override public long getLong() { return current; } @@ -291,6 +323,11 @@ public void skip() { dataColumn.skip(); } @Override + void skip(int n) { + current = false; + dataColumn.skip(n); + } + @Override public boolean getBoolean() { return current; } @@ -311,6 +348,11 @@ public void skip() { dataColumn.skip(); } @Override + void skip(int n) { + current = null; + dataColumn.skip(n); + } + @Override public Binary getBinary() { return current; } @@ -511,6 +553,7 @@ public int getCurrentDefinitionLevel() { private void checkRead() { int rl, dl; + int skipValues = 0; for (;;) { if (isPageFullyConsumed()) { if (isFullyConsumed()) { @@ -519,6 +562,7 @@ private void checkRead() { return; } readPage(); + skipValues = 0; } rl = repetitionLevelColumn.nextInt(); dl = definitionLevelColumn.nextInt(); @@ -527,9 +571,10 @@ private void checkRead() { break; } if (dl == maxDefinitionLevel) { - binding.skip(); + ++skipValues; } } + binding.skip(skipValues); repetitionLevel = rl; definitionLevel = dl; } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java index 57326607b5..3167d82f72 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java @@ -109,5 +109,17 @@ public long readLong() { * Skips the next value in the page */ abstract public void skip(); + + /** + * Skips the next n value in the page + * + * @param n + * the number of values to be skipped + */ + public void skip(int n) { + for (int i = 0; i < n; ++i) { + skip(); + } + } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java index dceaa526f5..58e02f2767 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java @@ -88,6 +88,14 @@ public void skip() { valuesRead++; } + @Override + public void skip(int n) { + // checkRead() is invoked before incrementing valuesRead so increase valuesRead size in 2 steps + valuesRead += n - 1; + checkRead(); + ++valuesRead; + } + @Override public int readInteger() { // TODO: probably implement it separately diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java index 1a2ccb9b53..4dbbcb5645 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java @@ -20,8 +20,6 @@ import java.io.IOException; -import java.nio.ByteBuffer; - import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader; @@ -64,7 +62,15 @@ public Binary readBytes() { @Override public void skip() { - int length = lengthReader.readInteger(); + skip(1); + } + + @Override + public void skip(int n) { + int length = 0; + for (int i = 0; i < n; ++i) { + length += lengthReader.readInteger(); + } try { in.skipFully(length); } catch (IOException e) { diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java index 15ed43438f..631c9084d1 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java @@ -19,7 +19,6 @@ package org.apache.parquet.column.values.plain; import java.io.IOException; -import java.nio.ByteBuffer; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.io.ParquetDecodingException; @@ -51,8 +50,13 @@ public Binary readBytes() { @Override public void skip() { + skip(1); + } + + @Override + public void skip(int n) { try { - in.skipFully(length); + in.skipFully(n * length); } catch (IOException | RuntimeException e) { throw new ParquetDecodingException("could not skip bytes at offset " + in.position(), e); } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java index f576528a98..127817eb0c 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java @@ -41,14 +41,26 @@ public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IO this.in = new LittleEndianDataInputStream(stream.remainingStream()); } + @Override + public void skip() { + skip(1); + } + + void skipBytesFully(int n) throws IOException { + int skipped = 0; + while (skipped < n) { + skipped += in.skipBytes(n - skipped); + } + } + public static class DoublePlainValuesReader extends PlainValuesReader { @Override - public void skip() { + public void skip(int n) { try { - in.skipBytes(8); + skipBytesFully(n * 8); } catch (IOException e) { - throw new ParquetDecodingException("could not skip double", e); + throw new ParquetDecodingException("could not skip " + n + " double values", e); } } @@ -65,11 +77,11 @@ public double readDouble() { public static class FloatPlainValuesReader extends PlainValuesReader { @Override - public void skip() { + public void skip(int n) { try { - in.skipBytes(4); + skipBytesFully(n * 4); } catch (IOException e) { - throw new ParquetDecodingException("could not skip float", e); + throw new ParquetDecodingException("could not skip " + n + " floats", e); } } @@ -86,11 +98,11 @@ public float readFloat() { public static class IntegerPlainValuesReader extends PlainValuesReader { @Override - public void skip() { + public void skip(int n) { try { - in.skipBytes(4); + in.skipBytes(n * 4); } catch (IOException e) { - throw new ParquetDecodingException("could not skip int", e); + throw new ParquetDecodingException("could not skip " + n + " ints", e); } } @@ -107,11 +119,11 @@ public int readInteger() { public static class LongPlainValuesReader extends PlainValuesReader { @Override - public void skip() { + public void skip(int n) { try { - in.skipBytes(8); + in.skipBytes(n * 8); } catch (IOException e) { - throw new ParquetDecodingException("could not skip long", e); + throw new ParquetDecodingException("could not skip " + n + " longs", e); } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java index fe00de999e..8039cf9e4d 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java @@ -19,8 +19,6 @@ package org.apache.parquet.column.values.rle; import java.io.IOException; -import java.nio.ByteBuffer; - import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.column.values.ValuesReader; @@ -43,4 +41,8 @@ public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IO public void skip() { } + @Override + public void skip(int n) { + } + }