diff --git a/lib/src/main/java/org/apache/arrow/datafusion/AbstractProxy.java b/lib/src/main/java/org/apache/arrow/datafusion/AbstractProxy.java new file mode 100644 index 0000000..10dcdbb --- /dev/null +++ b/lib/src/main/java/org/apache/arrow/datafusion/AbstractProxy.java @@ -0,0 +1,55 @@ +package org.apache.arrow.datafusion; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +abstract class AbstractProxy implements AutoCloseable { + private static final Logger logger = LogManager.getLogger(AbstractProxy.class); + private final long pointer; + private final AtomicBoolean closed; + private final ConcurrentMap children; + + protected AbstractProxy(long pointer) { + this.pointer = pointer; + logger.printf(Level.INFO, "Obtaining %s@%x", getClass().getSimpleName(), pointer); + this.closed = new AtomicBoolean(false); + this.children = new ConcurrentHashMap<>(); + } + + protected final void registerChild(AbstractProxy child) { + AbstractProxy old = children.putIfAbsent(child.getPointer(), child); + if (old != null) { + logger.warn("duplicated registry for {}: {}", child.getPointer(), old); + } + } + + protected final boolean isClosed() { + return closed.get(); + } + + protected final long getPointer() { + return pointer; + } + + abstract void doClose(long pointer) throws Exception; + + @Override + public final void close() throws Exception { + if (closed.compareAndSet(false, true)) { + for (AbstractProxy child : children.values()) { + // detection to avoid cycle + if (!child.isClosed()) { + child.close(); + } + } + logger.printf(Level.INFO, "Closing %s@%x", getClass().getSimpleName(), pointer); + doClose(pointer); + } else { + logger.warn("Failed to close {}, maybe already closed?", getPointer()); + } + } +} diff --git a/lib/src/main/java/org/apache/arrow/datafusion/DefaultDataFrame.java b/lib/src/main/java/org/apache/arrow/datafusion/DefaultDataFrame.java index 9147af8..0ea334b 100644 --- a/lib/src/main/java/org/apache/arrow/datafusion/DefaultDataFrame.java +++ b/lib/src/main/java/org/apache/arrow/datafusion/DefaultDataFrame.java @@ -2,27 +2,14 @@ import java.util.concurrent.CompletableFuture; import org.apache.arrow.vector.ipc.ArrowReader; -import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -class DefaultDataFrame implements DataFrame, AutoCloseable { - - private static final Logger logger = LogManager.getLogger(DefaultDataFrame.class); +class DefaultDataFrame extends AbstractProxy implements DataFrame { private final ExecutionContext context; - private final long pointer; DefaultDataFrame(ExecutionContext context, long pointer) { + super(pointer); this.context = context; - logger.printf(Level.INFO, "obtaining %x", pointer); - this.pointer = pointer; - } - - @Override - public void close() throws Exception { - logger.printf(Level.INFO, "closing %x", pointer); - DataFrames.destroyDataFrame(pointer); } @Override @@ -34,4 +21,9 @@ public ArrowReader getReader() { public CompletableFuture show() { throw new UnsupportedOperationException("show not implemented"); } + + @Override + void doClose(long pointer) { + DataFrames.destroyDataFrame(pointer); + } } diff --git a/lib/src/main/java/org/apache/arrow/datafusion/DefaultExecutionContext.java b/lib/src/main/java/org/apache/arrow/datafusion/DefaultExecutionContext.java index b486d46..9ae2c09 100644 --- a/lib/src/main/java/org/apache/arrow/datafusion/DefaultExecutionContext.java +++ b/lib/src/main/java/org/apache/arrow/datafusion/DefaultExecutionContext.java @@ -3,11 +3,10 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ThreadLocalRandom; -import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -class DefaultExecutionContext implements ExecutionContext { +class DefaultExecutionContext extends AbstractProxy implements ExecutionContext { private static final Logger logger = LogManager.getLogger(DefaultExecutionContext.class); @@ -22,13 +21,12 @@ public void onErrorMessage(long invocationId, String errorMessage) { @Override public DataFrame sql(String sql) { long invocationId = ThreadLocalRandom.current().nextLong(); - long dataFramePointerId = querySql(this, this.pointer, invocationId, sql); + long dataFramePointerId = querySql(this, getPointer(), invocationId, sql); if (dataFramePointerId <= 0) { throw getErrorForInvocation(invocationId); } else { DefaultDataFrame frame = new DefaultDataFrame(this, dataFramePointerId); - DefaultDataFrame absent = dataFrames.putIfAbsent(dataFramePointerId, frame); - assert null == absent : "got duplicated frame"; + registerChild(frame); return frame; } } @@ -44,26 +42,18 @@ private RuntimeException getErrorForInvocation(long invocationId) { return new RuntimeException(errorMessage); } - @Override - public void close() throws Exception { - for (DefaultDataFrame frame : dataFrames.values()) { - frame.close(); - } - runtime.close(); - logger.printf(Level.INFO, "closing %x", pointer); - ExecutionContexts.destroyExecutionContext(pointer); - } - - private final long pointer; - private final Runtime runtime; + private final TokioRuntime runtime; private final ConcurrentMap errorMessageInbox; - private final ConcurrentMap dataFrames; DefaultExecutionContext(long pointer) { - logger.printf(Level.INFO, "obtaining %x", pointer); - this.pointer = pointer; - this.runtime = Runtime.create(); + super(pointer); + this.runtime = TokioRuntime.create(); + registerChild(runtime); this.errorMessageInbox = new ConcurrentHashMap<>(); - this.dataFrames = new ConcurrentHashMap<>(); + } + + @Override + void doClose(long pointer) throws Exception { + ExecutionContexts.destroyExecutionContext(pointer); } } diff --git a/lib/src/main/java/org/apache/arrow/datafusion/Runtime.java b/lib/src/main/java/org/apache/arrow/datafusion/Runtime.java index b83e651..4455da5 100644 --- a/lib/src/main/java/org/apache/arrow/datafusion/Runtime.java +++ b/lib/src/main/java/org/apache/arrow/datafusion/Runtime.java @@ -1,12 +1,3 @@ package org.apache.arrow.datafusion; -public interface Runtime extends AutoCloseable { - - static Runtime create() { - long pointer = TokioRuntime.createTokioRuntime(); - if (pointer <= 0) { - throw new IllegalStateException("failed to create runtime"); - } - return new TokioRuntime(pointer); - } -} +public interface Runtime extends AutoCloseable {} diff --git a/lib/src/main/java/org/apache/arrow/datafusion/TokioRuntime.java b/lib/src/main/java/org/apache/arrow/datafusion/TokioRuntime.java index 95c23a2..8545ce3 100644 --- a/lib/src/main/java/org/apache/arrow/datafusion/TokioRuntime.java +++ b/lib/src/main/java/org/apache/arrow/datafusion/TokioRuntime.java @@ -1,27 +1,25 @@ package org.apache.arrow.datafusion; -import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -final class TokioRuntime implements Runtime { +final class TokioRuntime extends AbstractProxy implements Runtime { TokioRuntime(long pointer) { - logger.printf(Level.INFO, "obtaining %x", pointer); - this.pointer = pointer; + super(pointer); } - static native long createTokioRuntime(); - - static native void destroyTokioRuntime(long pointer); - - private static final Logger logger = LogManager.getLogger(TokioRuntime.class); - - private final long pointer; - @Override - public void close() throws Exception { - logger.printf(Level.INFO, "closing %x", pointer); + void doClose(long pointer) { destroyTokioRuntime(pointer); } + + static TokioRuntime create() { + long pointer = TokioRuntime.createTokioRuntime(); + if (pointer <= 0) { + throw new IllegalStateException("failed to create runtime"); + } + return new TokioRuntime(pointer); + } + + static native long createTokioRuntime(); + + static native void destroyTokioRuntime(long pointer); } diff --git a/lib/src/main/java/org/apache/arrow/datafusion/package-info.java b/lib/src/main/java/org/apache/arrow/datafusion/package-info.java index 87bc9e7..9da7f11 100644 --- a/lib/src/main/java/org/apache/arrow/datafusion/package-info.java +++ b/lib/src/main/java/org/apache/arrow/datafusion/package-info.java @@ -1 +1,5 @@ +/** + * This module contains a Java JNI binding to Apache Arrow Datafusion which is the query engine + * library to work with data in Arrow format. + */ package org.apache.arrow.datafusion;