Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.hadoop.codec;

import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A Helper class which use reflections to clean up DirectBuffer. It's implemented for
* better compatibility with both java8 and java9+, because the Cleaner class is moved to
* another place since java9+.
*/
public class CleanUtil {
private static final Logger logger = LoggerFactory.getLogger(CleanUtil.class);
private static final Field CLEANER_FIELD;
private static final Method CLEAN_METHOD;

static {
ByteBuffer buf = null;
Field cleanerField = null;
Method cleanMethod = null;
try {
buf = ByteBuffer.allocateDirect(1);
cleanerField = buf.getClass().getDeclaredField("cleaner");
cleanerField.setAccessible(true);
Object cleaner = cleanerField.get(buf);
cleanMethod = cleaner.getClass().getDeclaredMethod("clean");
} catch (NoSuchFieldException | NoSuchMethodException | IllegalAccessException e) {
logger.warn("Initialization failed for cleanerField or cleanMethod", e);
} finally {
clean(buf);
}
CLEANER_FIELD = cleanerField;
CLEAN_METHOD = cleanMethod;
}

public static void clean(ByteBuffer buffer) {
if (CLEANER_FIELD == null || CLEAN_METHOD == null) {
return;
}
try {
Object cleaner = CLEANER_FIELD.get(buffer);
CLEAN_METHOD.invoke(cleaner);
} catch (IllegalAccessException | InvocationTargetException | NullPointerException e) {
// Ignore clean failure
logger.warn("Clean failed for buffer " + buffer.getClass().getSimpleName(), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,23 @@
* entire input in setInput and compresses it as one compressed block.
*/
public class SnappyCompressor implements Compressor {
private static final int initialBufferSize = 64 * 1024 * 1024;

// Buffer for compressed output. This buffer grows as necessary.
private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(0);
private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(initialBufferSize);

// Buffer for uncompressed input. This buffer grows as necessary.
private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(0);
private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(initialBufferSize);

private long bytesRead = 0L;
private long bytesWritten = 0L;
private boolean finishCalled = false;

public SnappyCompressor() {
inputBuffer.limit(0);
outputBuffer.limit(0);
}

/**
* Fills specified buffer with compressed data. Returns actual number
* of bytes of compressed data. A return value of 0 indicates that
Expand All @@ -66,7 +73,9 @@ public synchronized int compress(byte[] buffer, int off, int len) throws IOExcep
// There is uncompressed input, compress it now
int maxOutputSize = Snappy.maxCompressedLength(inputBuffer.position());
if (maxOutputSize > outputBuffer.capacity()) {
ByteBuffer oldBuffer = outputBuffer;
outputBuffer = ByteBuffer.allocateDirect(maxOutputSize);
CleanUtil.clean(oldBuffer);
}
// Reset the previous outputBuffer
outputBuffer.clear();
Expand Down Expand Up @@ -97,7 +106,9 @@ public synchronized void setInput(byte[] buffer, int off, int len) {
ByteBuffer tmp = ByteBuffer.allocateDirect(inputBuffer.position() + len);
inputBuffer.rewind();
tmp.put(inputBuffer);
ByteBuffer oldBuffer = inputBuffer;
inputBuffer = tmp;
CleanUtil.clean(oldBuffer);
} else {
inputBuffer.limit(inputBuffer.position() + len);
}
Expand Down Expand Up @@ -146,6 +157,18 @@ public void reinit(Configuration c) {

@Override
public synchronized void reset() {
if (inputBuffer.capacity() > initialBufferSize) {
ByteBuffer oldBuffer = inputBuffer;
inputBuffer = ByteBuffer.allocateDirect(initialBufferSize);
CleanUtil.clean(oldBuffer);
}

if (outputBuffer.capacity() > initialBufferSize) {
ByteBuffer oldBuffer = outputBuffer;
outputBuffer = ByteBuffer.allocateDirect(initialBufferSize);
CleanUtil.clean(oldBuffer);
}

finishCalled = false;
bytesRead = bytesWritten = 0;
inputBuffer.rewind();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,21 @@
import org.apache.parquet.Preconditions;

public class SnappyDecompressor implements Decompressor {
private static final int initialBufferSize = 64 * 1024 * 1024;

// Buffer for uncompressed output. This buffer grows as necessary.
private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(0);
private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(initialBufferSize);

// Buffer for compressed input. This buffer grows as necessary.
private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(0);
private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(initialBufferSize);

private boolean finished;


public SnappyDecompressor() {
inputBuffer.limit(0);
outputBuffer.limit(0);
}

/**
* Fills specified buffer with uncompressed data. Returns actual number
* of bytes of uncompressed data. A return value of 0 indicates that
Expand All @@ -61,7 +68,9 @@ public synchronized int decompress(byte[] buffer, int off, int len) throws IOExc
// There is compressed input, decompress it now.
int decompressedSize = Snappy.uncompressedLength(inputBuffer);
if (decompressedSize > outputBuffer.capacity()) {
ByteBuffer oldBuffer = outputBuffer;
outputBuffer = ByteBuffer.allocateDirect(decompressedSize);
CleanUtil.clean(oldBuffer);
}

// Reset the previous outputBuffer (i.e. set position to 0)
Expand Down Expand Up @@ -102,7 +111,9 @@ public synchronized void setInput(byte[] buffer, int off, int len) {
ByteBuffer newBuffer = ByteBuffer.allocateDirect(inputBuffer.position() + len);
inputBuffer.rewind();
newBuffer.put(inputBuffer);
inputBuffer = newBuffer;
ByteBuffer oldBuffer = inputBuffer;
inputBuffer = newBuffer;
CleanUtil.clean(oldBuffer);
} else {
inputBuffer.limit(inputBuffer.position() + len);
}
Expand Down Expand Up @@ -131,6 +142,18 @@ public synchronized boolean needsInput() {

@Override
public synchronized void reset() {
if (inputBuffer.capacity() > initialBufferSize) {
ByteBuffer oldBuffer = inputBuffer;
inputBuffer = ByteBuffer.allocateDirect(initialBufferSize);
CleanUtil.clean(oldBuffer);
}

if (outputBuffer.capacity() > initialBufferSize) {
ByteBuffer oldBuffer = outputBuffer;
outputBuffer = ByteBuffer.allocateDirect(initialBufferSize);
CleanUtil.clean(oldBuffer);
}

finished = false;
inputBuffer.rewind();
outputBuffer.rewind();
Expand Down