diff --git a/rcljava/include/org_ros2_rcljava_executors_BaseExecutor.h b/rcljava/include/org_ros2_rcljava_executors_BaseExecutor.h index c94a7959..5522ac8d 100644 --- a/rcljava/include/org_ros2_rcljava_executors_BaseExecutor.h +++ b/rcljava/include/org_ros2_rcljava_executors_BaseExecutor.h @@ -134,6 +134,15 @@ JNIEXPORT void JNICALL Java_org_ros2_rcljava_executors_BaseExecutor_nativeWaitSetAddTimer( JNIEnv *, jclass, jlong, jlong); +/* + * Class: org_ros2_rcljava_executors_BaseExecutor + * Method: nativeWaitSetAddEvent + * Signature: (JJ)V + */ +JNIEXPORT void +JNICALL Java_org_ros2_rcljava_executors_BaseExecutor_nativeWaitSetAddEvent( + JNIEnv *, jclass, jlong, jlong); + /* * Class: org_ros2_rcljava_executors_BaseExecutor * Method: nativeWaitSetSubscriptionIsReady @@ -152,6 +161,15 @@ JNIEXPORT jboolean JNICALL Java_org_ros2_rcljava_executors_BaseExecutor_nativeWaitSetTimerIsReady( JNIEnv *, jclass, jlong, jlong); +/* + * Class: org_ros2_rcljava_executors_BaseExecutor + * Method: nativeWaitSetEventIsReady + * Signature: (JJ)Z + */ +JNIEXPORT jboolean +JNICALL Java_org_ros2_rcljava_executors_BaseExecutor_nativeWaitSetEventIsReady( + JNIEnv *, jclass, jlong, jlong); + /* * Class: org_ros2_rcljava_executors_BaseExecutor * Method: nativeWaitSetServiceIsReady diff --git a/rcljava/src/main/cpp/org_ros2_rcljava_executors_BaseExecutor.cpp b/rcljava/src/main/cpp/org_ros2_rcljava_executors_BaseExecutor.cpp index ad515de5..287a175f 100644 --- a/rcljava/src/main/cpp/org_ros2_rcljava_executors_BaseExecutor.cpp +++ b/rcljava/src/main/cpp/org_ros2_rcljava_executors_BaseExecutor.cpp @@ -276,6 +276,21 @@ Java_org_ros2_rcljava_executors_BaseExecutor_nativeWaitSetAddTimer( } } +JNIEXPORT void JNICALL +Java_org_ros2_rcljava_executors_BaseExecutor_nativeWaitSetAddEvent( + JNIEnv * env, jclass, jlong wait_set_handle, jlong event_handle) +{ + auto * wait_set = reinterpret_cast(wait_set_handle); + auto * event = reinterpret_cast(event_handle); + rcl_ret_t ret = rcl_wait_set_add_event(wait_set, event, nullptr); + if (ret != RCL_RET_OK) { + std::string msg = + "Failed to add event to wait set: " + std::string(rcl_get_error_string().str); + rcl_reset_error(); + rcljava_throw_rclexception(env, ret, msg); + } +} + JNIEXPORT jobject JNICALL Java_org_ros2_rcljava_executors_BaseExecutor_nativeTakeRequest( JNIEnv * env, jclass, jlong service_handle, jlong jrequest_from_java_converter_handle, @@ -435,6 +450,14 @@ Java_org_ros2_rcljava_executors_BaseExecutor_nativeWaitSetTimerIsReady( return wait_set->timers[index] != nullptr; } +JNIEXPORT jboolean JNICALL +Java_org_ros2_rcljava_executors_BaseExecutor_nativeWaitSetEventIsReady( + JNIEnv *, jclass, jlong wait_set_handle, jlong index) +{ + rcl_wait_set_t * wait_set = reinterpret_cast(wait_set_handle); + return wait_set->events[index] != nullptr; +} + JNIEXPORT jboolean JNICALL Java_org_ros2_rcljava_executors_BaseExecutor_nativeWaitSetServiceIsReady( JNIEnv *, jclass, jlong wait_set_handle, jlong index) diff --git a/rcljava/src/main/java/org/ros2/rcljava/executors/AnyExecutable.java b/rcljava/src/main/java/org/ros2/rcljava/executors/AnyExecutable.java index f6ce97b9..dd3d3039 100644 --- a/rcljava/src/main/java/org/ros2/rcljava/executors/AnyExecutable.java +++ b/rcljava/src/main/java/org/ros2/rcljava/executors/AnyExecutable.java @@ -16,6 +16,7 @@ package org.ros2.rcljava.executors; import org.ros2.rcljava.client.Client; +import org.ros2.rcljava.events.EventHandler; import org.ros2.rcljava.subscription.Subscription; import org.ros2.rcljava.service.Service; import org.ros2.rcljava.timer.Timer; @@ -25,4 +26,5 @@ public class AnyExecutable { public Subscription subscription; public Service service; public Client client; + public EventHandler eventHandler; } diff --git a/rcljava/src/main/java/org/ros2/rcljava/executors/BaseExecutor.java b/rcljava/src/main/java/org/ros2/rcljava/executors/BaseExecutor.java index f00ddf75..e7c59619 100644 --- a/rcljava/src/main/java/org/ros2/rcljava/executors/BaseExecutor.java +++ b/rcljava/src/main/java/org/ros2/rcljava/executors/BaseExecutor.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; +import java.util.Collection; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; @@ -31,11 +32,13 @@ import org.ros2.rcljava.RCLJava; import org.ros2.rcljava.client.Client; import org.ros2.rcljava.common.JNIUtils; +import org.ros2.rcljava.events.EventHandler; import org.ros2.rcljava.executors.AnyExecutable; import org.ros2.rcljava.executors.Executor; import org.ros2.rcljava.interfaces.MessageDefinition; import org.ros2.rcljava.interfaces.ServiceDefinition; import org.ros2.rcljava.node.ComposableNode; +import org.ros2.rcljava.publisher.Publisher; import org.ros2.rcljava.service.RMWRequestId; import org.ros2.rcljava.service.Service; import org.ros2.rcljava.subscription.Subscription; @@ -64,6 +67,8 @@ public class BaseExecutor { private List> clientHandles = new ArrayList>(); + private List> eventHandles = new ArrayList>(); + protected void addNode(ComposableNode node) { this.nodes.add(node); } @@ -158,6 +163,11 @@ protected void executeAnyExecutable(AnyExecutable anyExecutable) { } clientHandles.remove(anyExecutable.client.getHandle()); } + + if (anyExecutable.eventHandler != null) { + anyExecutable.eventHandler.executeCallback(); + eventHandles.remove(anyExecutable.eventHandler.getHandle()); + } } protected void waitForWork(long timeout) { @@ -165,6 +175,7 @@ protected void waitForWork(long timeout) { this.timerHandles.clear(); this.serviceHandles.clear(); this.clientHandles.clear(); + this.eventHandles.clear(); for (ComposableNode node : this.nodes) { for (Subscription subscription : node.getNode().getSubscriptions()) { @@ -172,6 +183,14 @@ protected void waitForWork(long timeout) { subscription.getHandle(), subscription)); } + for (Publisher publisher : node.getNode().getPublishers()) { + Collection eventHandlers = publisher.getEventHandlers(); + for (EventHandler eventHandler : eventHandlers) { + this.eventHandles.add(new AbstractMap.SimpleEntry( + eventHandler.getHandle(), eventHandler)); + } + } + for (Timer timer : node.getNode().getTimers()) { this.timerHandles.add(new AbstractMap.SimpleEntry(timer.getHandle(), timer)); } @@ -191,6 +210,7 @@ protected void waitForWork(long timeout) { int timersSize = 0; int clientsSize = 0; int servicesSize = 0; + int eventsSize = this.eventHandles.size(); for (ComposableNode node : this.nodes) { subscriptionsSize += node.getNode().getSubscriptions().size(); @@ -205,7 +225,9 @@ protected void waitForWork(long timeout) { long waitSetHandle = nativeGetZeroInitializedWaitSet(); long contextHandle = RCLJava.getDefaultContext().getHandle(); - nativeWaitSetInit(waitSetHandle, contextHandle, subscriptionsSize, 0, timersSize, clientsSize, servicesSize, 0); + nativeWaitSetInit( + waitSetHandle, contextHandle, subscriptionsSize, 0, + timersSize, clientsSize, servicesSize, eventsSize); nativeWaitSetClear(waitSetHandle); @@ -225,6 +247,10 @@ protected void waitForWork(long timeout) { nativeWaitSetAddClient(waitSetHandle, entry.getKey()); } + for (Map.Entry entry : this.eventHandles) { + nativeWaitSetAddEvent(waitSetHandle, entry.getKey()); + } + nativeWait(waitSetHandle, timeout); for (int i = 0; i < this.subscriptionHandles.size(); ++i) { @@ -251,6 +277,12 @@ protected void waitForWork(long timeout) { } } + for (int i = 0; i < this.eventHandles.size(); ++i) { + if (!nativeWaitSetEventIsReady(waitSetHandle, i)) { + this.eventHandles.get(i).setValue(null); + } + } + Iterator> subscriptionIterator = this.subscriptionHandles.iterator(); while (subscriptionIterator.hasNext()) { @@ -284,6 +316,14 @@ protected void waitForWork(long timeout) { } } + Iterator> eventIterator = this.eventHandles.iterator(); + while (eventIterator.hasNext()) { + Map.Entry entry = eventIterator.next(); + if (entry.getValue() == null) { + eventIterator.remove(); + } + } + nativeDisposeWaitSet(waitSetHandle); } @@ -325,6 +365,14 @@ protected AnyExecutable getNextExecutable() { } } + for (Map.Entry entry : this.eventHandles) { + if (entry.getValue() != null) { + anyExecutable.eventHandler = entry.getValue(); + entry.setValue(null); + return anyExecutable; + } + } + return null; } @@ -405,6 +453,8 @@ private static native MessageDefinition nativeTake( private static native void nativeWaitSetAddTimer(long waitSetHandle, long timerHandle); + private static native void nativeWaitSetAddEvent(long waitSetHandle, long eventHandle); + private static native RMWRequestId nativeTakeRequest(long serviceHandle, long requestFromJavaConverterHandle, long requestToJavaConverterHandle, long requestDestructorHandle, MessageDefinition requestMessage); @@ -421,6 +471,8 @@ private static native RMWRequestId nativeTakeResponse(long clientHandle, private static native boolean nativeWaitSetTimerIsReady(long waitSetHandle, long index); + private static native boolean nativeWaitSetEventIsReady(long waitSetHandle, long index); + private static native boolean nativeWaitSetServiceIsReady(long waitSetHandle, long index); private static native boolean nativeWaitSetClientIsReady(long waitSetHandle, long index); diff --git a/rcljava/src/main/java/org/ros2/rcljava/publisher/Publisher.java b/rcljava/src/main/java/org/ros2/rcljava/publisher/Publisher.java index d21eece2..47c857ff 100644 --- a/rcljava/src/main/java/org/ros2/rcljava/publisher/Publisher.java +++ b/rcljava/src/main/java/org/ros2/rcljava/publisher/Publisher.java @@ -16,6 +16,7 @@ package org.ros2.rcljava.publisher; import java.lang.ref.WeakReference; +import java.util.Collection; import java.util.function.Supplier; import org.ros2.rcljava.consumers.Consumer; @@ -64,4 +65,11 @@ EventHandler createEventHandler( */ void removeEventHandler( EventHandler eventHandler); + + /** + * Get the event handlers that were registered in this Publisher. + * + * @return The registered event handlers. + */ + Collection getEventHandlers(); } diff --git a/rcljava/src/main/java/org/ros2/rcljava/publisher/PublisherImpl.java b/rcljava/src/main/java/org/ros2/rcljava/publisher/PublisherImpl.java index 2bba3892..17018f87 100644 --- a/rcljava/src/main/java/org/ros2/rcljava/publisher/PublisherImpl.java +++ b/rcljava/src/main/java/org/ros2/rcljava/publisher/PublisherImpl.java @@ -139,7 +139,15 @@ void removeEventHandler( } /** - * Create a publisher event (rcl_event_t). + * {@inheritDoc} + */ + public final + Collection getEventHandlers() { + return this.eventHandlers; + } + + /** + * Create a publisher event (rcl_event_t) * * The ownership of the created event handle will immediately be transferred to an * @{link EventHandlerImpl}, that will be responsible of disposing it.