Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions lib/src/main/java/org/apache/arrow/datafusion/AbstractProxy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package org.apache.arrow.datafusion;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

abstract class AbstractProxy implements AutoCloseable {
private static final Logger logger = LogManager.getLogger(AbstractProxy.class);
private final long pointer;
private final AtomicBoolean closed;
private final ConcurrentMap<Long, AbstractProxy> children;

protected AbstractProxy(long pointer) {
this.pointer = pointer;
logger.printf(Level.INFO, "Obtaining %s@%x", getClass().getSimpleName(), pointer);
this.closed = new AtomicBoolean(false);
this.children = new ConcurrentHashMap<>();
}

protected final void registerChild(AbstractProxy child) {
AbstractProxy old = children.putIfAbsent(child.getPointer(), child);
if (old != null) {
logger.warn("duplicated registry for {}: {}", child.getPointer(), old);
}
}

protected final boolean isClosed() {
return closed.get();
}

protected final long getPointer() {
return pointer;
}

abstract void doClose(long pointer) throws Exception;

@Override
public final void close() throws Exception {
if (closed.compareAndSet(false, true)) {
for (AbstractProxy child : children.values()) {
// detection to avoid cycle
if (!child.isClosed()) {
child.close();
}
}
logger.printf(Level.INFO, "Closing %s@%x", getClass().getSimpleName(), pointer);
doClose(pointer);
} else {
logger.warn("Failed to close {}, maybe already closed?", getPointer());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,14 @@

import java.util.concurrent.CompletableFuture;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

class DefaultDataFrame implements DataFrame, AutoCloseable {

private static final Logger logger = LogManager.getLogger(DefaultDataFrame.class);
class DefaultDataFrame extends AbstractProxy implements DataFrame {

private final ExecutionContext context;
private final long pointer;

DefaultDataFrame(ExecutionContext context, long pointer) {
super(pointer);
this.context = context;
logger.printf(Level.INFO, "obtaining %x", pointer);
this.pointer = pointer;
}

@Override
public void close() throws Exception {
logger.printf(Level.INFO, "closing %x", pointer);
DataFrames.destroyDataFrame(pointer);
}

@Override
Expand All @@ -34,4 +21,9 @@ public ArrowReader getReader() {
public CompletableFuture<Void> show() {
throw new UnsupportedOperationException("show not implemented");
}

@Override
void doClose(long pointer) {
DataFrames.destroyDataFrame(pointer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

class DefaultExecutionContext implements ExecutionContext {
class DefaultExecutionContext extends AbstractProxy implements ExecutionContext {

private static final Logger logger = LogManager.getLogger(DefaultExecutionContext.class);

Expand All @@ -22,13 +21,12 @@ public void onErrorMessage(long invocationId, String errorMessage) {
@Override
public DataFrame sql(String sql) {
long invocationId = ThreadLocalRandom.current().nextLong();
long dataFramePointerId = querySql(this, this.pointer, invocationId, sql);
long dataFramePointerId = querySql(this, getPointer(), invocationId, sql);
if (dataFramePointerId <= 0) {
throw getErrorForInvocation(invocationId);
} else {
DefaultDataFrame frame = new DefaultDataFrame(this, dataFramePointerId);
DefaultDataFrame absent = dataFrames.putIfAbsent(dataFramePointerId, frame);
assert null == absent : "got duplicated frame";
registerChild(frame);
return frame;
}
}
Expand All @@ -44,26 +42,18 @@ private RuntimeException getErrorForInvocation(long invocationId) {
return new RuntimeException(errorMessage);
}

@Override
public void close() throws Exception {
for (DefaultDataFrame frame : dataFrames.values()) {
frame.close();
}
runtime.close();
logger.printf(Level.INFO, "closing %x", pointer);
ExecutionContexts.destroyExecutionContext(pointer);
}

private final long pointer;
private final Runtime runtime;
private final TokioRuntime runtime;
private final ConcurrentMap<Long, String> errorMessageInbox;
private final ConcurrentMap<Long, DefaultDataFrame> dataFrames;

DefaultExecutionContext(long pointer) {
logger.printf(Level.INFO, "obtaining %x", pointer);
this.pointer = pointer;
this.runtime = Runtime.create();
super(pointer);
this.runtime = TokioRuntime.create();
registerChild(runtime);
this.errorMessageInbox = new ConcurrentHashMap<>();
this.dataFrames = new ConcurrentHashMap<>();
}

@Override
void doClose(long pointer) throws Exception {
ExecutionContexts.destroyExecutionContext(pointer);
}
}
11 changes: 1 addition & 10 deletions lib/src/main/java/org/apache/arrow/datafusion/Runtime.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,3 @@
package org.apache.arrow.datafusion;

public interface Runtime extends AutoCloseable {

static Runtime create() {
long pointer = TokioRuntime.createTokioRuntime();
if (pointer <= 0) {
throw new IllegalStateException("failed to create runtime");
}
return new TokioRuntime(pointer);
}
}
public interface Runtime extends AutoCloseable {}
32 changes: 15 additions & 17 deletions lib/src/main/java/org/apache/arrow/datafusion/TokioRuntime.java
Original file line number Diff line number Diff line change
@@ -1,27 +1,25 @@
package org.apache.arrow.datafusion;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

final class TokioRuntime implements Runtime {
final class TokioRuntime extends AbstractProxy implements Runtime {

TokioRuntime(long pointer) {
logger.printf(Level.INFO, "obtaining %x", pointer);
this.pointer = pointer;
super(pointer);
}

static native long createTokioRuntime();

static native void destroyTokioRuntime(long pointer);

private static final Logger logger = LogManager.getLogger(TokioRuntime.class);

private final long pointer;

@Override
public void close() throws Exception {
logger.printf(Level.INFO, "closing %x", pointer);
void doClose(long pointer) {
destroyTokioRuntime(pointer);
}

static TokioRuntime create() {
long pointer = TokioRuntime.createTokioRuntime();
if (pointer <= 0) {
throw new IllegalStateException("failed to create runtime");
}
return new TokioRuntime(pointer);
}

static native long createTokioRuntime();

static native void destroyTokioRuntime(long pointer);
}
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
/**
* This module contains a Java JNI binding to Apache Arrow Datafusion which is the query engine
* library to work with data in Arrow format.
*/
package org.apache.arrow.datafusion;