diff --git a/parquet-hadoop/pom.xml b/parquet-hadoop/pom.xml index ff2829d225..d4fbf0567a 100644 --- a/parquet-hadoop/pom.xml +++ b/parquet-hadoop/pom.xml @@ -85,6 +85,11 @@ commons-pool 1.6 + + it.unimi.dsi + fastutil + ${fastutil.version} + com.github.rdblue brotli-codec diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java index 8bf24b2ca5..82b1414f29 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java @@ -30,41 +30,83 @@ * A Helper class which use reflections to clean up DirectBuffer. It's implemented for * better compatibility with both java8 and java9+, because the Cleaner class is moved to * another place since java9+. + * + * Strongly inspired by: + * https://github.com/apache/tomcat/blob/master/java/org/apache/tomcat/util/buf/ByteBufferUtils.java */ -public class CleanUtil { +public class CleanUtil +{ private static final Logger logger = LoggerFactory.getLogger(CleanUtil.class); - private static final Field CLEANER_FIELD; - private static final Method CLEAN_METHOD; + + private static final Object unsafe; + private static final Method cleanerMethod; + private static final Method cleanMethod; + private static final Method invokeCleanerMethod; + + private static final int majorVersion = + Integer.parseInt(System.getProperty("java.version").split("\\D+")[0]); static { - ByteBuffer buf = null; - Field cleanerField = null; - Method cleanMethod = null; - try { - buf = ByteBuffer.allocateDirect(1); - cleanerField = buf.getClass().getDeclaredField("cleaner"); - cleanerField.setAccessible(true); - Object cleaner = cleanerField.get(buf); - cleanMethod = cleaner.getClass().getDeclaredMethod("clean"); - } catch (NoSuchFieldException | NoSuchMethodException | IllegalAccessException e) { - logger.warn("Initialization failed for cleanerField or cleanMethod", e); - } finally { - clean(buf); + final ByteBuffer tempBuffer = ByteBuffer.allocateDirect(0); + Method cleanerMethodLocal = null; + Method cleanMethodLocal = null; + Object unsafeLocal = null; + Method invokeCleanerMethodLocal = null; + if (majorVersion >= 9) { + try { + final Class clazz = Class.forName("sun.misc.Unsafe"); + final Field theUnsafe = clazz.getDeclaredField("theUnsafe"); + theUnsafe.setAccessible(true); + unsafeLocal = theUnsafe.get(null); + invokeCleanerMethodLocal = clazz.getMethod("invokeCleaner", ByteBuffer.class); + invokeCleanerMethodLocal.invoke(unsafeLocal, tempBuffer); + } catch (IllegalAccessException | IllegalArgumentException + | InvocationTargetException | NoSuchMethodException | SecurityException + | ClassNotFoundException | NoSuchFieldException e) { + logger.warn("Cannot use direct ByteBuffer cleaner, memory leaking may occur", e); + unsafeLocal = null; + invokeCleanerMethodLocal = null; + } + } else { + try { + cleanerMethodLocal = tempBuffer.getClass().getMethod("cleaner"); + cleanerMethodLocal.setAccessible(true); + final Object cleanerObject = cleanerMethodLocal.invoke(tempBuffer); + cleanMethodLocal = cleanerObject.getClass().getMethod("clean"); + cleanMethodLocal.invoke(cleanerObject); + } catch (NoSuchMethodException | SecurityException | IllegalAccessException | + IllegalArgumentException | InvocationTargetException e) { + logger.warn("Cannot use direct ByteBuffer cleaner, memory leaking may occur", e); + cleanerMethodLocal = null; + cleanMethodLocal = null; + } } - CLEANER_FIELD = cleanerField; - CLEAN_METHOD = cleanMethod; + cleanerMethod = cleanerMethodLocal; + cleanMethod = cleanMethodLocal; + unsafe = unsafeLocal; + invokeCleanerMethod = invokeCleanerMethodLocal; } - public static void clean(ByteBuffer buffer) { - if (CLEANER_FIELD == null || CLEAN_METHOD == null) { - return; - } - try { - Object cleaner = CLEANER_FIELD.get(buffer); - CLEAN_METHOD.invoke(cleaner); - } catch (IllegalAccessException | InvocationTargetException | NullPointerException e) { - // Ignore clean failure - logger.warn("Clean failed for buffer " + buffer.getClass().getSimpleName(), e); + private CleanUtil() { + // Hide the default constructor since this is a utility class. + } + + public static void cleanDirectBuffer(ByteBuffer buf) { + if (cleanMethod != null) { + try { + cleanMethod.invoke(cleanerMethod.invoke(buf)); + } catch (IllegalAccessException | IllegalArgumentException + | InvocationTargetException | SecurityException e) { + logger.warn("Error while cleaning up the DirectBuffer", e); + } + } else if (invokeCleanerMethod != null) { + try { + invokeCleanerMethod.invoke(unsafe, buf); + } catch (IllegalAccessException | IllegalArgumentException + | InvocationTargetException | SecurityException e) { + logger.warn("Error while cleaning up the DirectBuffer", e); + } } } + } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java index 4720c08445..1d2bf611d2 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java @@ -68,7 +68,7 @@ public synchronized int compress(byte[] buffer, int off, int len) throws IOExcep if (maxOutputSize > outputBuffer.capacity()) { ByteBuffer oldBuffer = outputBuffer; outputBuffer = ByteBuffer.allocateDirect(maxOutputSize); - CleanUtil.clean(oldBuffer); + CleanUtil.cleanDirectBuffer(oldBuffer); } // Reset the previous outputBuffer outputBuffer.clear(); @@ -101,7 +101,7 @@ public synchronized void setInput(byte[] buffer, int off, int len) { tmp.put(inputBuffer); ByteBuffer oldBuffer = inputBuffer; inputBuffer = tmp; - CleanUtil.clean(oldBuffer); + CleanUtil.cleanDirectBuffer(oldBuffer); } else { inputBuffer.limit(inputBuffer.position() + len); } @@ -113,8 +113,8 @@ public synchronized void setInput(byte[] buffer, int off, int len) { @Override public void end() { - CleanUtil.clean(inputBuffer); - CleanUtil.clean(outputBuffer); + CleanUtil.cleanDirectBuffer(inputBuffer); + CleanUtil.cleanDirectBuffer(outputBuffer); } @Override diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java index c3da63f9c4..2e0c558930 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java @@ -63,7 +63,7 @@ public synchronized int decompress(byte[] buffer, int off, int len) throws IOExc if (decompressedSize > outputBuffer.capacity()) { ByteBuffer oldBuffer = outputBuffer; outputBuffer = ByteBuffer.allocateDirect(decompressedSize); - CleanUtil.clean(oldBuffer); + CleanUtil.cleanDirectBuffer(oldBuffer); } // Reset the previous outputBuffer (i.e. set position to 0) @@ -101,12 +101,12 @@ public synchronized void setInput(byte[] buffer, int off, int len) { SnappyUtil.validateBuffer(buffer, off, len); if (inputBuffer.capacity() - inputBuffer.position() < len) { - ByteBuffer newBuffer = ByteBuffer.allocateDirect(inputBuffer.position() + len); + final ByteBuffer newBuffer = ByteBuffer.allocateDirect(inputBuffer.position() + len); inputBuffer.rewind(); newBuffer.put(inputBuffer); - ByteBuffer oldBuffer = inputBuffer; + final ByteBuffer oldBuffer = inputBuffer; inputBuffer = newBuffer; - CleanUtil.clean(oldBuffer); + CleanUtil.cleanDirectBuffer(oldBuffer); } else { inputBuffer.limit(inputBuffer.position() + len); } @@ -115,8 +115,8 @@ public synchronized void setInput(byte[] buffer, int off, int len) { @Override public void end() { - CleanUtil.clean(inputBuffer); - CleanUtil.clean(outputBuffer); + CleanUtil.cleanDirectBuffer(inputBuffer); + CleanUtil.cleanDirectBuffer(outputBuffer); } @Override