Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions parquet-hadoop/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@
<artifactId>commons-pool</artifactId>
<version>1.6</version>
</dependency>
<dependency>
<groupId>it.unimi.dsi</groupId>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parquet-hadoop actually depends on the fastutil package, without this package I was unable to test the project in isolation.

<artifactId>fastutil</artifactId>
<version>${fastutil.version}</version>
</dependency>
<dependency>
<groupId>com.github.rdblue</groupId>
<artifactId>brotli-codec</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,41 +30,83 @@
* A Helper class which use reflections to clean up DirectBuffer. It's implemented for
* better compatibility with both java8 and java9+, because the Cleaner class is moved to
* another place since java9+.
*
* Strongly inspired by:
* https://github.com/apache/tomcat/blob/master/java/org/apache/tomcat/util/buf/ByteBufferUtils.java
*/
public class CleanUtil {
public class CleanUtil
{
private static final Logger logger = LoggerFactory.getLogger(CleanUtil.class);
private static final Field CLEANER_FIELD;
private static final Method CLEAN_METHOD;

private static final Object unsafe;
private static final Method cleanerMethod;
private static final Method cleanMethod;
private static final Method invokeCleanerMethod;

private static final int majorVersion =
Integer.parseInt(System.getProperty("java.version").split("\\D+")[0]);

static {
ByteBuffer buf = null;
Field cleanerField = null;
Method cleanMethod = null;
try {
buf = ByteBuffer.allocateDirect(1);
cleanerField = buf.getClass().getDeclaredField("cleaner");
cleanerField.setAccessible(true);
Object cleaner = cleanerField.get(buf);
cleanMethod = cleaner.getClass().getDeclaredMethod("clean");
} catch (NoSuchFieldException | NoSuchMethodException | IllegalAccessException e) {
logger.warn("Initialization failed for cleanerField or cleanMethod", e);
} finally {
clean(buf);
final ByteBuffer tempBuffer = ByteBuffer.allocateDirect(0);
Method cleanerMethodLocal = null;
Method cleanMethodLocal = null;
Object unsafeLocal = null;
Method invokeCleanerMethodLocal = null;
if (majorVersion >= 9) {
try {
final Class<?> clazz = Class.forName("sun.misc.Unsafe");
final Field theUnsafe = clazz.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
unsafeLocal = theUnsafe.get(null);
invokeCleanerMethodLocal = clazz.getMethod("invokeCleaner", ByteBuffer.class);
invokeCleanerMethodLocal.invoke(unsafeLocal, tempBuffer);
} catch (IllegalAccessException | IllegalArgumentException
| InvocationTargetException | NoSuchMethodException | SecurityException
| ClassNotFoundException | NoSuchFieldException e) {
logger.warn("Cannot use direct ByteBuffer cleaner, memory leaking may occur", e);
unsafeLocal = null;
invokeCleanerMethodLocal = null;
}
} else {
try {
cleanerMethodLocal = tempBuffer.getClass().getMethod("cleaner");
cleanerMethodLocal.setAccessible(true);
final Object cleanerObject = cleanerMethodLocal.invoke(tempBuffer);
cleanMethodLocal = cleanerObject.getClass().getMethod("clean");
cleanMethodLocal.invoke(cleanerObject);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException |
IllegalArgumentException | InvocationTargetException e) {
logger.warn("Cannot use direct ByteBuffer cleaner, memory leaking may occur", e);
cleanerMethodLocal = null;
cleanMethodLocal = null;
}
}
CLEANER_FIELD = cleanerField;
CLEAN_METHOD = cleanMethod;
cleanerMethod = cleanerMethodLocal;
cleanMethod = cleanMethodLocal;
unsafe = unsafeLocal;
invokeCleanerMethod = invokeCleanerMethodLocal;
}

public static void clean(ByteBuffer buffer) {
if (CLEANER_FIELD == null || CLEAN_METHOD == null) {
return;
}
try {
Object cleaner = CLEANER_FIELD.get(buffer);
CLEAN_METHOD.invoke(cleaner);
} catch (IllegalAccessException | InvocationTargetException | NullPointerException e) {
// Ignore clean failure
logger.warn("Clean failed for buffer " + buffer.getClass().getSimpleName(), e);
private CleanUtil() {
// Hide the default constructor since this is a utility class.
}

public static void cleanDirectBuffer(ByteBuffer buf) {
if (cleanMethod != null) {
try {
cleanMethod.invoke(cleanerMethod.invoke(buf));
} catch (IllegalAccessException | IllegalArgumentException
| InvocationTargetException | SecurityException e) {
logger.warn("Error while cleaning up the DirectBuffer", e);
}
} else if (invokeCleanerMethod != null) {
try {
invokeCleanerMethod.invoke(unsafe, buf);
} catch (IllegalAccessException | IllegalArgumentException
| InvocationTargetException | SecurityException e) {
logger.warn("Error while cleaning up the DirectBuffer", e);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public synchronized int compress(byte[] buffer, int off, int len) throws IOExcep
if (maxOutputSize > outputBuffer.capacity()) {
ByteBuffer oldBuffer = outputBuffer;
outputBuffer = ByteBuffer.allocateDirect(maxOutputSize);
CleanUtil.clean(oldBuffer);
CleanUtil.cleanDirectBuffer(oldBuffer);
}
// Reset the previous outputBuffer
outputBuffer.clear();
Expand Down Expand Up @@ -101,7 +101,7 @@ public synchronized void setInput(byte[] buffer, int off, int len) {
tmp.put(inputBuffer);
ByteBuffer oldBuffer = inputBuffer;
inputBuffer = tmp;
CleanUtil.clean(oldBuffer);
CleanUtil.cleanDirectBuffer(oldBuffer);
} else {
inputBuffer.limit(inputBuffer.position() + len);
}
Expand All @@ -113,8 +113,8 @@ public synchronized void setInput(byte[] buffer, int off, int len) {

@Override
public void end() {
CleanUtil.clean(inputBuffer);
CleanUtil.clean(outputBuffer);
CleanUtil.cleanDirectBuffer(inputBuffer);
CleanUtil.cleanDirectBuffer(outputBuffer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public synchronized int decompress(byte[] buffer, int off, int len) throws IOExc
if (decompressedSize > outputBuffer.capacity()) {
ByteBuffer oldBuffer = outputBuffer;
outputBuffer = ByteBuffer.allocateDirect(decompressedSize);
CleanUtil.clean(oldBuffer);
CleanUtil.cleanDirectBuffer(oldBuffer);
}

// Reset the previous outputBuffer (i.e. set position to 0)
Expand Down Expand Up @@ -101,12 +101,12 @@ public synchronized void setInput(byte[] buffer, int off, int len) {
SnappyUtil.validateBuffer(buffer, off, len);

if (inputBuffer.capacity() - inputBuffer.position() < len) {
ByteBuffer newBuffer = ByteBuffer.allocateDirect(inputBuffer.position() + len);
final ByteBuffer newBuffer = ByteBuffer.allocateDirect(inputBuffer.position() + len);
inputBuffer.rewind();
newBuffer.put(inputBuffer);
ByteBuffer oldBuffer = inputBuffer;
final ByteBuffer oldBuffer = inputBuffer;
inputBuffer = newBuffer;
CleanUtil.clean(oldBuffer);
CleanUtil.cleanDirectBuffer(oldBuffer);
} else {
inputBuffer.limit(inputBuffer.position() + len);
}
Expand All @@ -115,8 +115,8 @@ public synchronized void setInput(byte[] buffer, int off, int len) {

@Override
public void end() {
CleanUtil.clean(inputBuffer);
CleanUtil.clean(outputBuffer);
CleanUtil.cleanDirectBuffer(inputBuffer);
CleanUtil.cleanDirectBuffer(outputBuffer);
}

@Override
Expand Down