diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java new file mode 100644 index 0000000000..8bf24b2ca5 --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop.codec; + +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.nio.ByteBuffer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A Helper class which use reflections to clean up DirectBuffer. It's implemented for + * better compatibility with both java8 and java9+, because the Cleaner class is moved to + * another place since java9+. + */ +public class CleanUtil { + private static final Logger logger = LoggerFactory.getLogger(CleanUtil.class); + private static final Field CLEANER_FIELD; + private static final Method CLEAN_METHOD; + + static { + ByteBuffer buf = null; + Field cleanerField = null; + Method cleanMethod = null; + try { + buf = ByteBuffer.allocateDirect(1); + cleanerField = buf.getClass().getDeclaredField("cleaner"); + cleanerField.setAccessible(true); + Object cleaner = cleanerField.get(buf); + cleanMethod = cleaner.getClass().getDeclaredMethod("clean"); + } catch (NoSuchFieldException | NoSuchMethodException | IllegalAccessException e) { + logger.warn("Initialization failed for cleanerField or cleanMethod", e); + } finally { + clean(buf); + } + CLEANER_FIELD = cleanerField; + CLEAN_METHOD = cleanMethod; + } + + public static void clean(ByteBuffer buffer) { + if (CLEANER_FIELD == null || CLEAN_METHOD == null) { + return; + } + try { + Object cleaner = CLEANER_FIELD.get(buffer); + CLEAN_METHOD.invoke(cleaner); + } catch (IllegalAccessException | InvocationTargetException | NullPointerException e) { + // Ignore clean failure + logger.warn("Clean failed for buffer " + buffer.getClass().getSimpleName(), e); + } + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java index d0270ca7c1..b2a8e7f10e 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java @@ -32,16 +32,23 @@ * entire input in setInput and compresses it as one compressed block. */ public class SnappyCompressor implements Compressor { + private static final int initialBufferSize = 64 * 1024 * 1024; + // Buffer for compressed output. This buffer grows as necessary. - private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(0); + private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(initialBufferSize); // Buffer for uncompressed input. This buffer grows as necessary. - private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(0); + private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(initialBufferSize); private long bytesRead = 0L; private long bytesWritten = 0L; private boolean finishCalled = false; + public SnappyCompressor() { + inputBuffer.limit(0); + outputBuffer.limit(0); + } + /** * Fills specified buffer with compressed data. Returns actual number * of bytes of compressed data. A return value of 0 indicates that @@ -66,7 +73,9 @@ public synchronized int compress(byte[] buffer, int off, int len) throws IOExcep // There is uncompressed input, compress it now int maxOutputSize = Snappy.maxCompressedLength(inputBuffer.position()); if (maxOutputSize > outputBuffer.capacity()) { + ByteBuffer oldBuffer = outputBuffer; outputBuffer = ByteBuffer.allocateDirect(maxOutputSize); + CleanUtil.clean(oldBuffer); } // Reset the previous outputBuffer outputBuffer.clear(); @@ -97,7 +106,9 @@ public synchronized void setInput(byte[] buffer, int off, int len) { ByteBuffer tmp = ByteBuffer.allocateDirect(inputBuffer.position() + len); inputBuffer.rewind(); tmp.put(inputBuffer); + ByteBuffer oldBuffer = inputBuffer; inputBuffer = tmp; + CleanUtil.clean(oldBuffer); } else { inputBuffer.limit(inputBuffer.position() + len); } @@ -146,6 +157,18 @@ public void reinit(Configuration c) { @Override public synchronized void reset() { + if (inputBuffer.capacity() > initialBufferSize) { + ByteBuffer oldBuffer = inputBuffer; + inputBuffer = ByteBuffer.allocateDirect(initialBufferSize); + CleanUtil.clean(oldBuffer); + } + + if (outputBuffer.capacity() > initialBufferSize) { + ByteBuffer oldBuffer = outputBuffer; + outputBuffer = ByteBuffer.allocateDirect(initialBufferSize); + CleanUtil.clean(oldBuffer); + } + finishCalled = false; bytesRead = bytesWritten = 0; inputBuffer.rewind(); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java index 190f8d5318..8a7f86d5ae 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java @@ -27,14 +27,21 @@ import org.apache.parquet.Preconditions; public class SnappyDecompressor implements Decompressor { + private static final int initialBufferSize = 64 * 1024 * 1024; + // Buffer for uncompressed output. This buffer grows as necessary. - private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(0); + private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(initialBufferSize); // Buffer for compressed input. This buffer grows as necessary. - private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(0); + private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(initialBufferSize); private boolean finished; - + + public SnappyDecompressor() { + inputBuffer.limit(0); + outputBuffer.limit(0); + } + /** * Fills specified buffer with uncompressed data. Returns actual number * of bytes of uncompressed data. A return value of 0 indicates that @@ -61,7 +68,9 @@ public synchronized int decompress(byte[] buffer, int off, int len) throws IOExc // There is compressed input, decompress it now. int decompressedSize = Snappy.uncompressedLength(inputBuffer); if (decompressedSize > outputBuffer.capacity()) { + ByteBuffer oldBuffer = outputBuffer; outputBuffer = ByteBuffer.allocateDirect(decompressedSize); + CleanUtil.clean(oldBuffer); } // Reset the previous outputBuffer (i.e. set position to 0) @@ -102,7 +111,9 @@ public synchronized void setInput(byte[] buffer, int off, int len) { ByteBuffer newBuffer = ByteBuffer.allocateDirect(inputBuffer.position() + len); inputBuffer.rewind(); newBuffer.put(inputBuffer); - inputBuffer = newBuffer; + ByteBuffer oldBuffer = inputBuffer; + inputBuffer = newBuffer; + CleanUtil.clean(oldBuffer); } else { inputBuffer.limit(inputBuffer.position() + len); } @@ -131,6 +142,18 @@ public synchronized boolean needsInput() { @Override public synchronized void reset() { + if (inputBuffer.capacity() > initialBufferSize) { + ByteBuffer oldBuffer = inputBuffer; + inputBuffer = ByteBuffer.allocateDirect(initialBufferSize); + CleanUtil.clean(oldBuffer); + } + + if (outputBuffer.capacity() > initialBufferSize) { + ByteBuffer oldBuffer = outputBuffer; + outputBuffer = ByteBuffer.allocateDirect(initialBufferSize); + CleanUtil.clean(oldBuffer); + } + finished = false; inputBuffer.rewind(); outputBuffer.rewind();