Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ private static abstract class Binding {
*/
abstract void skip();

/**
* Skips n values from the underlying page
*
* @param n
* the number of values to be skipped
*/
abstract void skip(int n);

/**
* write current value to converter
*/
Expand Down Expand Up @@ -163,6 +171,10 @@ void read() {
public void skip() {
dataColumn.skip();
}
@Override
void skip(int n) {
dataColumn.skip(n);
}
public int getDictionaryId() {
return dictionaryId;
}
Expand Down Expand Up @@ -203,6 +215,11 @@ public void skip() {
current = 0;
dataColumn.skip();
}
@Override
void skip(int n) {
current = 0;
dataColumn.skip(n);
}
public float getFloat() {
return current;
}
Expand All @@ -222,6 +239,11 @@ public void skip() {
current = 0;
dataColumn.skip();
}
@Override
void skip(int n) {
current = 0;
dataColumn.skip(n);
}
public double getDouble() {
return current;
}
Expand All @@ -242,6 +264,11 @@ public void skip() {
dataColumn.skip();
}
@Override
void skip(int n) {
current = 0;
dataColumn.skip(n);
}
@Override
public int getInteger() {
return current;
}
Expand All @@ -262,6 +289,11 @@ public void skip() {
dataColumn.skip();
}
@Override
void skip(int n) {
current = 0;
dataColumn.skip(n);
}
@Override
public long getLong() {
return current;
}
Expand Down Expand Up @@ -291,6 +323,11 @@ public void skip() {
dataColumn.skip();
}
@Override
void skip(int n) {
current = false;
dataColumn.skip(n);
}
@Override
public boolean getBoolean() {
return current;
}
Expand All @@ -311,6 +348,11 @@ public void skip() {
dataColumn.skip();
}
@Override
void skip(int n) {
current = null;
dataColumn.skip(n);
}
@Override
public Binary getBinary() {
return current;
}
Expand Down Expand Up @@ -511,6 +553,7 @@ public int getCurrentDefinitionLevel() {

private void checkRead() {
int rl, dl;
int skipValues = 0;
for (;;) {
if (isPageFullyConsumed()) {
if (isFullyConsumed()) {
Expand All @@ -519,6 +562,7 @@ private void checkRead() {
return;
}
readPage();
skipValues = 0;
}
rl = repetitionLevelColumn.nextInt();
dl = definitionLevelColumn.nextInt();
Expand All @@ -527,9 +571,10 @@ private void checkRead() {
break;
}
if (dl == maxDefinitionLevel) {
binding.skip();
++skipValues;
}
}
binding.skip(skipValues);
repetitionLevel = rl;
definitionLevel = dl;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,5 +109,17 @@ public long readLong() {
* Skips the next value in the page
*/
abstract public void skip();

/**
* Skips the next n value in the page
*
* @param n
* the number of values to be skipped
*/
public void skip(int n) {
for (int i = 0; i < n; ++i) {
skip();
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,14 @@ public void skip() {
valuesRead++;
}

@Override
public void skip(int n) {
// checkRead() is invoked before incrementing valuesRead so increase valuesRead size in 2 steps
valuesRead += n - 1;
checkRead();
++valuesRead;
}

@Override
public int readInteger() {
// TODO: probably implement it separately
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@


import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader;
Expand Down Expand Up @@ -64,7 +62,15 @@ public Binary readBytes() {

@Override
public void skip() {
int length = lengthReader.readInteger();
skip(1);
}

@Override
public void skip(int n) {
int length = 0;
for (int i = 0; i < n; ++i) {
length += lengthReader.readInteger();
}
try {
in.skipFully(length);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.parquet.column.values.plain;

import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.ParquetDecodingException;
Expand Down Expand Up @@ -51,8 +50,13 @@ public Binary readBytes() {

@Override
public void skip() {
skip(1);
}

@Override
public void skip(int n) {
try {
in.skipFully(length);
in.skipFully(n * length);
} catch (IOException | RuntimeException e) {
throw new ParquetDecodingException("could not skip bytes at offset " + in.position(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,26 @@ public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IO
this.in = new LittleEndianDataInputStream(stream.remainingStream());
}

@Override
public void skip() {
skip(1);
}

void skipBytesFully(int n) throws IOException {
int skipped = 0;
while (skipped < n) {
skipped += in.skipBytes(n - skipped);
}
}

public static class DoublePlainValuesReader extends PlainValuesReader {

@Override
public void skip() {
public void skip(int n) {
try {
in.skipBytes(8);
skipBytesFully(n * 8);
} catch (IOException e) {
throw new ParquetDecodingException("could not skip double", e);
throw new ParquetDecodingException("could not skip " + n + " double values", e);
}
}

Expand All @@ -65,11 +77,11 @@ public double readDouble() {
public static class FloatPlainValuesReader extends PlainValuesReader {

@Override
public void skip() {
public void skip(int n) {
try {
in.skipBytes(4);
skipBytesFully(n * 4);
} catch (IOException e) {
throw new ParquetDecodingException("could not skip float", e);
throw new ParquetDecodingException("could not skip " + n + " floats", e);
}
}

Expand All @@ -86,11 +98,11 @@ public float readFloat() {
public static class IntegerPlainValuesReader extends PlainValuesReader {

@Override
public void skip() {
public void skip(int n) {
try {
in.skipBytes(4);
in.skipBytes(n * 4);
} catch (IOException e) {
throw new ParquetDecodingException("could not skip int", e);
throw new ParquetDecodingException("could not skip " + n + " ints", e);
}
}

Expand All @@ -107,11 +119,11 @@ public int readInteger() {
public static class LongPlainValuesReader extends PlainValuesReader {

@Override
public void skip() {
public void skip(int n) {
try {
in.skipBytes(8);
in.skipBytes(n * 8);
} catch (IOException e) {
throw new ParquetDecodingException("could not skip long", e);
throw new ParquetDecodingException("could not skip " + n + " longs", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
package org.apache.parquet.column.values.rle;

import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.column.values.ValuesReader;

Expand All @@ -43,4 +41,8 @@ public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IO
public void skip() {
}

@Override
public void skip(int n) {
}

}