diff --git a/rcljava/src/main/java/org/ros2/rcljava/RCLJava.java b/rcljava/src/main/java/org/ros2/rcljava/RCLJava.java index 92c44f6f..eadbba7c 100644 --- a/rcljava/src/main/java/org/ros2/rcljava/RCLJava.java +++ b/rcljava/src/main/java/org/ros2/rcljava/RCLJava.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.Map; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import org.ros2.rcljava.client.Client; @@ -360,6 +361,34 @@ public static void spinSome(final ComposableNode composableNode) { getGlobalExecutor().removeNode(composableNode); } + public static void spinUntilComplete(final Node node, final Future future, long timeoutNs) { + ComposableNode composableNode = new ComposableNode() { + public Node getNode() { + return node; + } + }; + RCLJava.spinUntilComplete(composableNode, future, timeoutNs); + } + + public static void spinUntilComplete(final Node node, final Future future) + { + RCLJava.spinUntilComplete(node, future, -1); + } + + public static void spinUntilComplete( + final ComposableNode node, final Future future, long timeoutNs) + { + getGlobalExecutor().addNode(node); + getGlobalExecutor().spinUntilComplete(future, timeoutNs); + getGlobalExecutor().removeNode(node); + } + + public static void spinUntilComplete( + final ComposableNode node, final Future future) + { + RCLJava.spinUntilComplete(node, future, -1); + } + public static synchronized void shutdown() { cleanup(); if (RCLJava.defaultContext != null) { diff --git a/rcljava/src/main/java/org/ros2/rcljava/client/ClientImpl.java b/rcljava/src/main/java/org/ros2/rcljava/client/ClientImpl.java index 442aa8ec..f48a3892 100644 --- a/rcljava/src/main/java/org/ros2/rcljava/client/ClientImpl.java +++ b/rcljava/src/main/java/org/ros2/rcljava/client/ClientImpl.java @@ -87,7 +87,7 @@ public void accept(Future input) {} long sequenceNumber = nativeSendClientRequest( handle, request.getFromJavaConverterInstance(), request.getDestructorInstance(), request); - RCLFuture future = new RCLFuture(this.nodeReference); + RCLFuture future = new RCLFuture(); Map.Entry entry = new AbstractMap.SimpleEntry(callback, future); diff --git a/rcljava/src/main/java/org/ros2/rcljava/concurrent/RCLFuture.java b/rcljava/src/main/java/org/ros2/rcljava/concurrent/RCLFuture.java index 5b45ccc4..2b21891f 100644 --- a/rcljava/src/main/java/org/ros2/rcljava/concurrent/RCLFuture.java +++ b/rcljava/src/main/java/org/ros2/rcljava/concurrent/RCLFuture.java @@ -15,6 +15,7 @@ package org.ros2.rcljava.concurrent; +import java.lang.Deprecated; import java.lang.ref.WeakReference; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -22,73 +23,50 @@ import java.util.concurrent.TimeoutException; import org.ros2.rcljava.RCLJava; -import org.ros2.rcljava.executors.Executor; import org.ros2.rcljava.node.Node; public class RCLFuture implements Future { private WeakReference nodeReference; private boolean done = false; private V value = null; - private Executor executor = null; - public RCLFuture(final WeakReference nodeReference) { - this.nodeReference = nodeReference; - } - - public RCLFuture(final Executor executor) { - this.executor = executor; - } + public RCLFuture() {} - public final V get() throws InterruptedException, ExecutionException { + public final synchronized V get() throws InterruptedException, ExecutionException { if(this.value != null) { return this.value; } while (RCLJava.ok() && !isDone()) { - if (executor != null) { - executor.spinOnce(); - } else { - Node node = nodeReference.get(); - if (node == null) { - return null; // TODO(esteve) do something - } - RCLJava.spinOnce(node); - } + this.wait(); } return this.value; } - public final V get(final long timeout, final TimeUnit unit) + public final synchronized V get(final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (isDone()) { return value; } - long endTime = TimeUnit.NANOSECONDS.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS); - - long timeoutNS = TimeUnit.NANOSECONDS.convert(timeout, unit); + long endTime = System.nanoTime(); + long timeoutNS = unit.toNanos(timeout); if (timeoutNS > 0) { endTime += timeoutNS; } while (RCLJava.ok()) { - if (executor != null) { - executor.spinOnce(timeoutNS); - } else { - Node node = nodeReference.get(); - if (node == null) { - return null; // TODO(esteve) do something - } - RCLJava.spinOnce(node, timeoutNS); - } + this.wait(TimeUnit.NANOSECONDS.toMillis(timeoutNS), (int) (timeoutNS % 1000000l)); if (isDone()) { return value; } - long now = TimeUnit.NANOSECONDS.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS); + long now = System.nanoTime(); if (now >= endTime) { throw new TimeoutException(); + } else { + timeoutNS = endTime - now; } } throw new InterruptedException(); @@ -109,5 +87,6 @@ public final boolean cancel(final boolean mayInterruptIfRunning) { public final synchronized void set(final V value) { this.value = value; done = true; + this.notify(); } } diff --git a/rcljava/src/main/java/org/ros2/rcljava/executors/BaseExecutor.java b/rcljava/src/main/java/org/ros2/rcljava/executors/BaseExecutor.java index 5c43b6c4..e500f9ac 100644 --- a/rcljava/src/main/java/org/ros2/rcljava/executors/BaseExecutor.java +++ b/rcljava/src/main/java/org/ros2/rcljava/executors/BaseExecutor.java @@ -15,6 +15,7 @@ package org.ros2.rcljava.executors; +import java.lang.Math; import java.lang.SuppressWarnings; import java.util.AbstractMap; import java.util.ArrayList; @@ -24,6 +25,7 @@ import java.util.Collection; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; @@ -439,6 +441,30 @@ private boolean maxDurationNotElapsed(long maxDurationNs, long startNs) { return false; } + public void spinUntilComplete(Future future, long maxDurationNs) { + long startNs = System.nanoTime(); + // only use a blocking call to waitForWork when maxDurationNs < 0 + long waitTimeout = -1; + if (maxDurationNs > 0) { + // We cannot be waiting for work forever, if not we're not going to respect the passed timeout. + // We can neither do a non-blocking call to waitForWork(), because if the future has not yet + // been completed it will result in a busy loop. + // Use an arbitrary timeout to relax cpu usage. + waitTimeout = Math.min(maxDurationNs / 10, 10000000 /* 1ms*/); + } + while (RCLJava.ok() && (maxDurationNs < 0 || maxDurationNotElapsed(maxDurationNs, startNs))) { + waitForWork(waitTimeout); + AnyExecutable anyExecutable = getNextExecutable(); + while (anyExecutable != null) { + executeAnyExecutable(anyExecutable); + if (future.isDone()) { + return; + } + anyExecutable = getNextExecutable(); + } + } + } + private void spinSomeImpl(long maxDurationNs, boolean exhaustive) { long startNs = System.nanoTime(); boolean workAvailable = false; diff --git a/rcljava/src/main/java/org/ros2/rcljava/executors/Executor.java b/rcljava/src/main/java/org/ros2/rcljava/executors/Executor.java index fe9810c9..5268e531 100644 --- a/rcljava/src/main/java/org/ros2/rcljava/executors/Executor.java +++ b/rcljava/src/main/java/org/ros2/rcljava/executors/Executor.java @@ -15,6 +15,8 @@ package org.ros2.rcljava.executors; +import java.util.concurrent.Future; + import org.ros2.rcljava.node.ComposableNode; public interface Executor { @@ -26,6 +28,10 @@ public interface Executor { public void spinOnce(long timeout); + public void spinUntilComplete(Future future, long maxDurationNs); + + public void spinUntilComplete(Future future); + public void spinSome(); public void spinSome(long maxDurationNs); diff --git a/rcljava/src/main/java/org/ros2/rcljava/executors/MultiThreadedExecutor.java b/rcljava/src/main/java/org/ros2/rcljava/executors/MultiThreadedExecutor.java index 1bd41e31..5e813701 100644 --- a/rcljava/src/main/java/org/ros2/rcljava/executors/MultiThreadedExecutor.java +++ b/rcljava/src/main/java/org/ros2/rcljava/executors/MultiThreadedExecutor.java @@ -17,6 +17,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import org.ros2.rcljava.RCLJava; import org.ros2.rcljava.node.ComposableNode; @@ -55,6 +56,14 @@ public void spinOnce(long timeout) { this.baseExecutor.spinOnce(timeout); } + public void spinUntilComplete(Future future, long timeoutNs) { + this.baseExecutor.spinUntilComplete(future, timeoutNs); + } + + public void spinUntilComplete(Future future) { + this.baseExecutor.spinUntilComplete(future, -1); + } + public void spinSome() { this.spinSome(0); } diff --git a/rcljava/src/main/java/org/ros2/rcljava/executors/SingleThreadedExecutor.java b/rcljava/src/main/java/org/ros2/rcljava/executors/SingleThreadedExecutor.java index 93b41d06..a55259d4 100644 --- a/rcljava/src/main/java/org/ros2/rcljava/executors/SingleThreadedExecutor.java +++ b/rcljava/src/main/java/org/ros2/rcljava/executors/SingleThreadedExecutor.java @@ -15,6 +15,8 @@ package org.ros2.rcljava.executors; +import java.util.concurrent.Future; + import org.ros2.rcljava.RCLJava; import org.ros2.rcljava.node.ComposableNode; import org.ros2.rcljava.executors.BaseExecutor; @@ -38,6 +40,14 @@ public void spinOnce(long timeout) { this.baseExecutor.spinOnce(timeout); } + public void spinUntilComplete(Future future, long timeoutNs) { + this.baseExecutor.spinUntilComplete(future, timeoutNs); + } + + public void spinUntilComplete(Future future) { + this.baseExecutor.spinUntilComplete(future, -1); + } + public void spinSome() { this.spinSome(0); } diff --git a/rcljava/src/main/java/org/ros2/rcljava/parameters/client/AsyncParametersClient.java b/rcljava/src/main/java/org/ros2/rcljava/parameters/client/AsyncParametersClient.java index 5a7a74e7..86a766a8 100644 --- a/rcljava/src/main/java/org/ros2/rcljava/parameters/client/AsyncParametersClient.java +++ b/rcljava/src/main/java/org/ros2/rcljava/parameters/client/AsyncParametersClient.java @@ -19,6 +19,7 @@ import java.util.concurrent.Future; import org.ros2.rcljava.consumers.Consumer; +import org.ros2.rcljava.node.Node; import org.ros2.rcljava.parameters.ParameterType; import org.ros2.rcljava.parameters.ParameterVariant; @@ -59,4 +60,6 @@ public Future> describeParameters( public Future> describeParameters( final List names, final Consumer>> callback); + + public Node getNode(); } \ No newline at end of file diff --git a/rcljava/src/main/java/org/ros2/rcljava/parameters/client/AsyncParametersClientImpl.java b/rcljava/src/main/java/org/ros2/rcljava/parameters/client/AsyncParametersClientImpl.java index 5de84b4f..a9ece715 100644 --- a/rcljava/src/main/java/org/ros2/rcljava/parameters/client/AsyncParametersClientImpl.java +++ b/rcljava/src/main/java/org/ros2/rcljava/parameters/client/AsyncParametersClientImpl.java @@ -104,8 +104,7 @@ public Future> getParameters(final List names) { public Future> getParameters( final List names, final Consumer>> callback) { - final RCLFuture> futureResult = - new RCLFuture>(new WeakReference(this.node)); + final RCLFuture> futureResult = new RCLFuture>(); final rcl_interfaces.srv.GetParameters_Request request = new rcl_interfaces.srv.GetParameters_Request(); request.setNames(names); @@ -141,8 +140,7 @@ public Future> getParameterTypes(final List names) { public Future> getParameterTypes( final List names, final Consumer>> callback) { - final RCLFuture> futureResult = - new RCLFuture>(new WeakReference(this.node)); + final RCLFuture> futureResult = new RCLFuture>(); final rcl_interfaces.srv.GetParameterTypes_Request request = new rcl_interfaces.srv.GetParameterTypes_Request(); request.setNames(names); @@ -178,8 +176,7 @@ public Future> setParameters( final List parameters, final Consumer>> callback) { final RCLFuture> futureResult = - new RCLFuture>( - new WeakReference(this.node)); + new RCLFuture>(); final rcl_interfaces.srv.SetParameters_Request request = new rcl_interfaces.srv.SetParameters_Request(); List requestParameters = @@ -188,7 +185,6 @@ public Future> setParameters( requestParameters.add(parameterVariant.toParameter()); } request.setParameters(requestParameters); - setParametersClient.asyncSendRequest( request, new Consumer>() { public void accept(final Future future) { @@ -216,7 +212,7 @@ public Future setParametersAtomically( final List parameters, final Consumer> callback) { final RCLFuture futureResult = - new RCLFuture(new WeakReference(this.node)); + new RCLFuture(); final rcl_interfaces.srv.SetParametersAtomically_Request request = new rcl_interfaces.srv.SetParametersAtomically_Request(); List requestParameters = @@ -253,7 +249,7 @@ public Future listParameters( public Future listParameters(final List prefixes, long depth, final Consumer> callback) { final RCLFuture futureResult = - new RCLFuture(new WeakReference(this.node)); + new RCLFuture(); final rcl_interfaces.srv.ListParameters_Request request = new rcl_interfaces.srv.ListParameters_Request(); request.setPrefixes(prefixes); @@ -286,8 +282,7 @@ public Future> describeParameters( final List names, final Consumer>> callback) { final RCLFuture> futureResult = - new RCLFuture>( - new WeakReference(this.node)); + new RCLFuture>(); final rcl_interfaces.srv.DescribeParameters_Request request = new rcl_interfaces.srv.DescribeParameters_Request(); request.setNames(names); @@ -309,4 +304,8 @@ public void accept(final Future }); return futureResult; } + + public Node getNode() { + return this.node; + } } diff --git a/rcljava/src/main/java/org/ros2/rcljava/parameters/client/SyncParametersClientImpl.java b/rcljava/src/main/java/org/ros2/rcljava/parameters/client/SyncParametersClientImpl.java index b8a0a323..5ee58cb8 100644 --- a/rcljava/src/main/java/org/ros2/rcljava/parameters/client/SyncParametersClientImpl.java +++ b/rcljava/src/main/java/org/ros2/rcljava/parameters/client/SyncParametersClientImpl.java @@ -23,6 +23,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import org.ros2.rcljava.RCLJava; import org.ros2.rcljava.client.Client; import org.ros2.rcljava.concurrent.RCLFuture; import org.ros2.rcljava.consumers.Consumer; @@ -33,24 +34,6 @@ import org.ros2.rcljava.parameters.ParameterVariant; public class SyncParametersClientImpl implements SyncParametersClient { - private static class ConsumerHelper implements Consumer> { - private RCLFuture resultFuture; - - public void accept(final Future future) { - T result = null; - try { - result = future.get(); - } catch (Exception e) { - // TODO(esteve): do something - } - this.resultFuture.set(result); - } - - public ConsumerHelper(RCLFuture resultFuture) { - this.resultFuture = resultFuture; - } - } - private Executor executor; public AsyncParametersClient asyncParametersClient; @@ -107,79 +90,54 @@ public SyncParametersClientImpl(final Executor executor, final Node node) this(executor, node, "", QoSProfile.PARAMETERS); } - public List getParameters(final List names) - throws InterruptedException, ExecutionException { + private T spinUntilComplete(Future future) + throws InterruptedException, ExecutionException + { if (executor != null) { - RCLFuture> future = new RCLFuture>(executor); - asyncParametersClient.getParameters( - names, new ConsumerHelper>(future)); - return future.get(); + executor.spinUntilComplete(future); } else { - return asyncParametersClient.getParameters(names, null).get(); + RCLJava.spinUntilComplete(this.asyncParametersClient.getNode(), future); } + return future.get(); + } + + public List getParameters(final List names) + throws InterruptedException, ExecutionException { + Future> future = asyncParametersClient.getParameters(names, null); + return spinUntilComplete(future); } public List getParameterTypes(final List names) throws InterruptedException, ExecutionException { - if (executor != null) { - RCLFuture> future = new RCLFuture>(executor); - asyncParametersClient.getParameterTypes( - names, new ConsumerHelper>(future)); - return future.get(); - } else { - return asyncParametersClient.getParameterTypes(names, null).get(); - } + Future> future = asyncParametersClient.getParameterTypes(names, null); + return spinUntilComplete(future); } public List setParameters( final List parameters) throws InterruptedException, ExecutionException { - if (executor != null) { - RCLFuture> future = - new RCLFuture>(executor); - asyncParametersClient.setParameters( - parameters, new ConsumerHelper>(future)); - return future.get(); - } else { - return asyncParametersClient.setParameters(parameters, null).get(); - } + Future> future = asyncParametersClient.setParameters( + parameters, null); + return spinUntilComplete(future); } public rcl_interfaces.msg.SetParametersResult setParametersAtomically( final List parameters) throws InterruptedException, ExecutionException { - if (executor != null) { - RCLFuture future = - new RCLFuture(executor); - asyncParametersClient.setParametersAtomically( - parameters, new ConsumerHelper(future)); - return future.get(); - } else { - return asyncParametersClient.setParametersAtomically(parameters, null).get(); - } + Future future = asyncParametersClient.setParametersAtomically( + parameters, null); + return spinUntilComplete(future); } public rcl_interfaces.msg.ListParametersResult listParameters( final List prefixes, long depth) throws InterruptedException, ExecutionException { - if (executor != null) { - RCLFuture future = - new RCLFuture(executor); - asyncParametersClient.listParameters( - prefixes, depth, new ConsumerHelper(future)); - return future.get(); - } else { - return asyncParametersClient.listParameters(prefixes, depth, null).get(); - } + Future future = asyncParametersClient.listParameters( + prefixes, depth, null); + return spinUntilComplete(future); } public List describeParameters(final List names) throws InterruptedException, ExecutionException { - if (executor != null) { - RCLFuture> future = - new RCLFuture>(executor); - asyncParametersClient.describeParameters( - names, new ConsumerHelper>(future)); - return future.get(); - } else { - return asyncParametersClient.describeParameters(names, null).get(); - } + Future> future = asyncParametersClient.describeParameters( + names, null); + return spinUntilComplete(future); } } \ No newline at end of file diff --git a/rcljava/src/test/java/org/ros2/rcljava/action/ActionServerTest.java b/rcljava/src/test/java/org/ros2/rcljava/action/ActionServerTest.java index a9f5d07f..2e49b6ad 100644 --- a/rcljava/src/test/java/org/ros2/rcljava/action/ActionServerTest.java +++ b/rcljava/src/test/java/org/ros2/rcljava/action/ActionServerTest.java @@ -139,19 +139,9 @@ public test_msgs.action.Fibonacci_SendGoal_Response sendGoal(int order) throws E Future future = this.mockActionClient.sendGoalClient.asyncSendRequest(request); - test_msgs.action.Fibonacci_SendGoal_Response response = null; long startTime = System.nanoTime(); - while (RCLJava.ok() && !future.isDone()) { - this.executor.spinOnce(1); - response = future.get(100, TimeUnit.MILLISECONDS); - - // Check for timeout - long duration = System.nanoTime() - startTime; - if (TimeUnit.NANOSECONDS.toSeconds(duration) >= 5) { - break; - } - } - return response; + this.executor.spinUntilComplete(future, TimeUnit.SECONDS.toNanos(5)); + return future.get(); } @Test @@ -211,15 +201,7 @@ public final void testCancelGoal() throws Exception { // Wait for cancel response long startTime = System.nanoTime(); - while (RCLJava.ok() && !cancelResponseFuture.isDone()) { - this.executor.spinOnce(100000000); // timeout of 100 milliseconds - - // Check for timeout - long duration = System.nanoTime() - startTime; - if (TimeUnit.NANOSECONDS.toSeconds(duration) >= 5) { - break; - } - } + this.executor.spinUntilComplete(cancelResponseFuture, TimeUnit.SECONDS.toNanos(5)); assertEquals(true, cancelResponseFuture.isDone()); action_msgs.srv.CancelGoal_Response cancelResponse = cancelResponseFuture.get(); diff --git a/rcljava/src/test/java/org/ros2/rcljava/client/ClientTest.java b/rcljava/src/test/java/org/ros2/rcljava/client/ClientTest.java index f2961e5a..71e95218 100644 --- a/rcljava/src/test/java/org/ros2/rcljava/client/ClientTest.java +++ b/rcljava/src/test/java/org/ros2/rcljava/client/ClientTest.java @@ -37,6 +37,7 @@ import org.ros2.rcljava.RCLJava; import org.ros2.rcljava.concurrent.RCLFuture; import org.ros2.rcljava.consumers.TriConsumer; +import org.ros2.rcljava.executors.Executor; import org.ros2.rcljava.node.Node; import org.ros2.rcljava.service.RMWRequestId; import org.ros2.rcljava.service.Service; @@ -97,7 +98,7 @@ public static void tearDownOnce() { @Test public final void testAdd() throws Exception { RCLFuture consumerFuture = - new RCLFuture(new WeakReference(node)); + new RCLFuture(); TestClientConsumer clientConsumer = new TestClientConsumer(consumerFuture); @@ -115,7 +116,8 @@ public final void testAdd() throws Exception { Future responseFuture = client.asyncSendRequest(request); - rcljava.srv.AddTwoInts_Response response = responseFuture.get(10, TimeUnit.SECONDS); + RCLJava.spinUntilComplete(node, responseFuture, TimeUnit.SECONDS.toNanos(10)); + rcljava.srv.AddTwoInts_Response response = responseFuture.get(); // Check that the message was received by the service assertTrue(consumerFuture.isDone()); diff --git a/rcljava/src/test/java/org/ros2/rcljava/node/NodeTest.java b/rcljava/src/test/java/org/ros2/rcljava/node/NodeTest.java index d501b389..29597325 100644 --- a/rcljava/src/test/java/org/ros2/rcljava/node/NodeTest.java +++ b/rcljava/src/test/java/org/ros2/rcljava/node/NodeTest.java @@ -226,7 +226,7 @@ public final void testPubSubStdString() throws Exception { node.createPublisher(std_msgs.msg.String.class, "test_topic_string"); RCLFuture future = - new RCLFuture(new WeakReference(node)); + new RCLFuture(); Subscription subscription = node.createSubscription(std_msgs.msg.String.class, "test_topic_string", @@ -256,7 +256,7 @@ public final void testPubSubBoundedArrayNested() throws Exception { rcljava.msg.BoundedArrayNested.class, "test_topic_bounded_array_nested"); RCLFuture future = - new RCLFuture(new WeakReference(node)); + new RCLFuture(); Subscription subscription = node.createSubscription( @@ -298,7 +298,7 @@ public final void testPubSubBoundedArrayPrimitives() throws Exception { rcljava.msg.BoundedArrayPrimitives.class, "test_topic_bounded_array_primitives"); RCLFuture future = - new RCLFuture(new WeakReference(node)); + new RCLFuture(); Subscription subscription = node.createSubscription( @@ -371,7 +371,7 @@ public final void testPubSubBuiltins() throws Exception { rcljava.msg.Builtins.class, "test_topic_builtins"); RCLFuture future = - new RCLFuture(new WeakReference(node)); + new RCLFuture(); Subscription subscription = node.createSubscription(rcljava.msg.Builtins.class, @@ -418,7 +418,7 @@ public final void testPubSubDynamicArrayNested() throws Exception { rcljava.msg.DynamicArrayNested.class, "test_topic_dynamic_array_nested"); RCLFuture future = - new RCLFuture(new WeakReference(node)); + new RCLFuture(); Subscription subscription = node.createSubscription( @@ -460,7 +460,7 @@ public final void testPubSubDynamicArrayPrimitives() throws Exception { rcljava.msg.DynamicArrayPrimitives.class, "test_topic_dynamic_array_primitives"); RCLFuture future = - new RCLFuture(new WeakReference(node)); + new RCLFuture(); Subscription subscription = node.createSubscription( @@ -533,7 +533,7 @@ public final void testPubSubEmpty() throws Exception { node.createPublisher(rcljava.msg.Empty.class, "test_topic_empty"); RCLFuture future = - new RCLFuture(new WeakReference(node)); + new RCLFuture(); Subscription subscription = node.createSubscription( rcljava.msg.Empty.class, "test_topic_empty", new TestConsumer(future)); @@ -561,7 +561,7 @@ public final void testPubSubFieldsWithSameType() throws Exception { rcljava.msg.FieldsWithSameType.class, "test_topic_fields_with_same_type"); RCLFuture future = - new RCLFuture(new WeakReference(node)); + new RCLFuture(); Subscription subscription = node.createSubscription( @@ -605,7 +605,7 @@ public final void testPubSubNested() throws Exception { node.createPublisher(rcljava.msg.Nested.class, "test_topic_nested"); RCLFuture future = - new RCLFuture(new WeakReference(node)); + new RCLFuture(); Subscription subscription = node.createSubscription(rcljava.msg.Nested.class, "test_topic_nested", @@ -640,7 +640,7 @@ public final void testPubSubPrimitives() throws Exception { rcljava.msg.Primitives.class, "test_topic_primitives"); RCLFuture future = - new RCLFuture(new WeakReference(node)); + new RCLFuture(); Subscription subscription = node.createSubscription(rcljava.msg.Primitives.class, @@ -671,7 +671,7 @@ public final void testPubSubStaticArrayNested() throws Exception { rcljava.msg.StaticArrayNested.class, "test_topic_static_array_nested"); RCLFuture future = - new RCLFuture(new WeakReference(node)); + new RCLFuture(); Subscription subscription = node.createSubscription(rcljava.msg.StaticArrayNested.class, @@ -726,7 +726,7 @@ public final void testPubSubStaticArrayPrimitives() throws Exception { rcljava.msg.StaticArrayPrimitives.class, "test_topic_static_array_primitives"); RCLFuture future = - new RCLFuture(new WeakReference(node)); + new RCLFuture(); Subscription subscription = node.createSubscription( @@ -800,7 +800,7 @@ public final void testPubUInt32() throws Exception { node.createPublisher(rcljava.msg.UInt32.class, "test_topic_uint32"); RCLFuture future = - new RCLFuture(new WeakReference(node)); + new RCLFuture(); Subscription subscription = node.createSubscription(rcljava.msg.UInt32.class, "test_topic_uint32", @@ -834,13 +834,13 @@ public final void testPubUInt32MultipleNodes() throws Exception { Publisher publisher = publisherNode.createPublisher( rcljava.msg.UInt32.class, "test_topic_multiple"); - RCLFuture futureOne = new RCLFuture(executor); + RCLFuture futureOne = new RCLFuture(); Subscription subscriptionOne = subscriptionNodeOne.createSubscription(rcljava.msg.UInt32.class, "test_topic_multiple", new TestConsumer(futureOne)); - RCLFuture futureTwo = new RCLFuture(executor); + RCLFuture futureTwo = new RCLFuture(); Subscription subscriptionTwo = subscriptionNodeTwo.createSubscription(rcljava.msg.UInt32.class, diff --git a/rcljava/src/test/java/org/ros2/rcljava/parameters/AsyncParametersClientTest.java b/rcljava/src/test/java/org/ros2/rcljava/parameters/AsyncParametersClientTest.java index df2f749f..38f3e718 100644 --- a/rcljava/src/test/java/org/ros2/rcljava/parameters/AsyncParametersClientTest.java +++ b/rcljava/src/test/java/org/ros2/rcljava/parameters/AsyncParametersClientTest.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.ros2.rcljava.RCLJava; import org.ros2.rcljava.concurrent.RCLFuture; @@ -113,19 +114,18 @@ public final void testSetParameters() throws Exception { new ParameterVariant("foo.second", 42), new ParameterVariant("foobar", true)}); RCLFuture> future = - new RCLFuture>( - new WeakReference(this.node)); + new RCLFuture>(); parametersClient.setParameters(parameters, new TestConsumer(future)); - List parameterNames = - Arrays.asList(new String[] {"foo", "bar", "baz", "foo.first", "foo.second", "foobar"}); - + RCLJava.spinUntilComplete(node, future); List setParametersResults = future.get(); assertEquals(6, setParametersResults.size()); for (rcl_interfaces.msg.SetParametersResult result : setParametersResults) { assertEquals(true, result.getSuccessful()); } + List parameterNames = + Arrays.asList(new String[] {"foo", "bar", "baz", "foo.first", "foo.second", "foobar"}); List results = node.getParameters(parameterNames); assertEquals(parameters, results); } @@ -142,10 +142,10 @@ public final void testGetParameters() throws Exception { List parameterNames = Arrays.asList(new String[] {"foo", "bar", "baz", "foo.first", "foo.second", "foobar"}); - RCLFuture> future = - new RCLFuture>(new WeakReference(this.node)); + RCLFuture> future = new RCLFuture>(); parametersClient.getParameters(parameterNames, new TestConsumer(future)); + RCLJava.spinUntilComplete(node, future); assertEquals(parameters, future.get()); } @@ -159,10 +159,11 @@ public final void testListParameters() throws Exception { node.setParameters(parameters); RCLFuture future = - new RCLFuture(new WeakReference(this.node)); + new RCLFuture(); parametersClient.listParameters( Arrays.asList(new String[] {"foo", "bar"}), 10, new TestConsumer(future)); + RCLJava.spinUntilComplete(node, future); assertArrayEquals(new String[] {"foo.first", "foo.second"}, future.get().getNames()); assertArrayEquals(new String[] {"foo"}, future.get().getPrefixes()); } @@ -177,8 +178,7 @@ public final void testDescribeParameters() throws Exception { node.setParameters(parameters); RCLFuture> future = - new RCLFuture>( - new WeakReference(this.node)); + new RCLFuture>(); parametersClient.describeParameters( Arrays.asList(new String[] {"foo", "bar"}), new TestConsumer(future)); @@ -188,6 +188,7 @@ public final void testDescribeParameters() throws Exception { rcl_interfaces.msg.ParameterType.PARAMETER_INTEGER), new rcl_interfaces.msg.ParameterDescriptor().setName("bar").setType( rcl_interfaces.msg.ParameterType.PARAMETER_STRING)}); + RCLJava.spinUntilComplete(node, future); assertEquals(expected, future.get()); } } diff --git a/rcljava/src/test/java/org/ros2/rcljava/timer/TimerTest.java b/rcljava/src/test/java/org/ros2/rcljava/timer/TimerTest.java index fbad664e..586eb667 100644 --- a/rcljava/src/test/java/org/ros2/rcljava/timer/TimerTest.java +++ b/rcljava/src/test/java/org/ros2/rcljava/timer/TimerTest.java @@ -75,7 +75,7 @@ public final void testCreateWallTimer() throws Exception { int max_iterations = 4; Node node = RCLJava.createNode("test_timer_node"); - RCLFuture future = new RCLFuture(new WeakReference(node)); + RCLFuture future = new RCLFuture(); TimerCallback timerCallback = new TimerCallback(future, max_iterations); Timer timer = node.createWallTimer(250, TimeUnit.MILLISECONDS, timerCallback); assertNotEquals(0, timer.getHandle()); @@ -83,6 +83,7 @@ public final void testCreateWallTimer() throws Exception { assertEquals( TimeUnit.NANOSECONDS.convert(250, TimeUnit.MILLISECONDS), timer.getTimerPeriodNS()); + RCLJava.spinUntilComplete(node, future); boolean result = future.get(3, TimeUnit.SECONDS); assertTrue(result); assertEquals(4, timerCallback.getCounter()); @@ -97,7 +98,7 @@ public final void testCreateTimer() throws Exception { int max_iterations = 4; Node node = RCLJava.createNode("test_timer_node"); - RCLFuture future = new RCLFuture(new WeakReference(node)); + RCLFuture future = new RCLFuture(); TimerCallback timerCallback = new TimerCallback(future, max_iterations); Timer timer = node.createTimer(250, TimeUnit.MILLISECONDS, timerCallback); assertNotEquals(0, timer.getHandle()); @@ -105,6 +106,7 @@ public final void testCreateTimer() throws Exception { assertEquals( TimeUnit.NANOSECONDS.convert(250, TimeUnit.MILLISECONDS), timer.getTimerPeriodNS()); + RCLJava.spinUntilComplete(node, future); boolean result = future.get(3, TimeUnit.SECONDS); assertTrue(result); assertEquals(4, timerCallback.getCounter()); diff --git a/rcljava_common/cmake/Modules/JavaExtra.cmake b/rcljava_common/cmake/Modules/JavaExtra.cmake index 2c0b7d06..259345f7 100644 --- a/rcljava_common/cmake/Modules/JavaExtra.cmake +++ b/rcljava_common/cmake/Modules/JavaExtra.cmake @@ -106,11 +106,11 @@ function(ament_add_junit_tests TARGET_NAME) set(${TARGET_NAME}_jar_dependencies "${JUNIT_JAR}${SEPARATOR}${HAMCREST_JAR}") add_jar("${TARGET_NAME}_jar" - "${_source_files}" + SOURCES "${_source_files}" OUTPUT_NAME "${TARGET_NAME}" INCLUDE_JARS - "${ARG_INCLUDE_JARS}" + ${ARG_INCLUDE_JARS} "${JUNIT_JAR}" "${HAMCREST_JAR}" )