diff --git a/rcljava/include/org_ros2_rcljava_subscription_SubscriptionImpl.h b/rcljava/include/org_ros2_rcljava_subscription_SubscriptionImpl.h index afb3456c..1f788aef 100644 --- a/rcljava/include/org_ros2_rcljava_subscription_SubscriptionImpl.h +++ b/rcljava/include/org_ros2_rcljava_subscription_SubscriptionImpl.h @@ -29,6 +29,15 @@ JNIEXPORT void JNICALL Java_org_ros2_rcljava_subscription_SubscriptionImpl_nativeDispose( JNIEnv *, jclass, jlong, jlong); +/* + * Class: org_ros2_rcljava_subscription_SubscriptionImpl + * Method: nativeCreateEvent + * Signature: (JJ)J + */ +JNIEXPORT jlong +JNICALL Java_org_ros2_rcljava_subscription_SubscriptionImpl_nativeCreateEvent( + JNIEnv *, jclass, jlong, jint); + #ifdef __cplusplus } #endif diff --git a/rcljava/src/main/cpp/org_ros2_rcljava_subscription_SubscriptionImpl.cpp b/rcljava/src/main/cpp/org_ros2_rcljava_subscription_SubscriptionImpl.cpp index 151bc0a0..5c995f26 100644 --- a/rcljava/src/main/cpp/org_ros2_rcljava_subscription_SubscriptionImpl.cpp +++ b/rcljava/src/main/cpp/org_ros2_rcljava_subscription_SubscriptionImpl.cpp @@ -19,6 +19,7 @@ #include #include "rcl/error_handling.h" +#include "rcl/event.h" #include "rcl/node.h" #include "rcl/rcl.h" #include "rmw/rmw.h" @@ -29,6 +30,7 @@ #include "org_ros2_rcljava_subscription_SubscriptionImpl.h" +using rcljava_common::exceptions::rcljava_throw_exception; using rcljava_common::exceptions::rcljava_throw_rclexception; JNIEXPORT void JNICALL @@ -61,3 +63,31 @@ Java_org_ros2_rcljava_subscription_SubscriptionImpl_nativeDispose( rcljava_throw_rclexception(env, ret, msg); } } + +JNIEXPORT jlong +JNICALL Java_org_ros2_rcljava_subscription_SubscriptionImpl_nativeCreateEvent( + JNIEnv * env, jclass, jlong subscription_handle, jint event_type) +{ + auto * subscription = reinterpret_cast(subscription_handle); + if (!subscription) { + rcljava_throw_exception( + env, "java/lang/IllegalArgumentException", "passed rcl_subscription_t handle is NULL"); + return 0; + } + auto * event = static_cast(malloc(sizeof(rcl_event_t))); + if (!event) { + rcljava_throw_exception(env, "java/lang/OutOfMemoryError", "failed to allocate rcl_event_t"); + return 0; + } + *event = rcl_get_zero_initialized_event(); + rcl_ret_t ret = rcl_subscription_event_init( + event, subscription, static_cast(event_type)); + if (RCL_RET_OK != ret) { + std::string msg = "Failed to create event: " + std::string(rcl_get_error_string().str); + rcl_reset_error(); + rcljava_throw_rclexception(env, ret, msg); + free(event); + return 0; + } + return reinterpret_cast(event); +} diff --git a/rcljava/src/main/java/org/ros2/rcljava/executors/BaseExecutor.java b/rcljava/src/main/java/org/ros2/rcljava/executors/BaseExecutor.java index e7c59619..1ed9218e 100644 --- a/rcljava/src/main/java/org/ros2/rcljava/executors/BaseExecutor.java +++ b/rcljava/src/main/java/org/ros2/rcljava/executors/BaseExecutor.java @@ -181,6 +181,11 @@ protected void waitForWork(long timeout) { for (Subscription subscription : node.getNode().getSubscriptions()) { this.subscriptionHandles.add(new AbstractMap.SimpleEntry( subscription.getHandle(), subscription)); + Collection eventHandlers = subscription.getEventHandlers(); + for (EventHandler eventHandler : eventHandlers) { + this.eventHandles.add(new AbstractMap.SimpleEntry( + eventHandler.getHandle(), eventHandler)); + } } for (Publisher publisher : node.getNode().getPublishers()) { diff --git a/rcljava/src/main/java/org/ros2/rcljava/subscription/Subscription.java b/rcljava/src/main/java/org/ros2/rcljava/subscription/Subscription.java index c74cc9bd..732d8bbb 100644 --- a/rcljava/src/main/java/org/ros2/rcljava/subscription/Subscription.java +++ b/rcljava/src/main/java/org/ros2/rcljava/subscription/Subscription.java @@ -16,8 +16,12 @@ package org.ros2.rcljava.subscription; import java.lang.ref.WeakReference; +import java.util.Collection; +import java.util.function.Supplier; import org.ros2.rcljava.consumers.Consumer; +import org.ros2.rcljava.events.EventHandler; +import org.ros2.rcljava.events.SubscriptionEventStatus; import org.ros2.rcljava.interfaces.Disposable; import org.ros2.rcljava.interfaces.MessageDefinition; import org.ros2.rcljava.node.Node; @@ -42,4 +46,30 @@ public interface Subscription extends Disposable { WeakReference getNodeReference(); void executeCallback(T message); + + /** + * Create an event handler. + * + * @param A subscription event status type. + * @param factory A factory that can instantiate an event status of type T. + * @param callback Callback that will be called when the event is triggered. + */ + EventHandler createEventHandler( + Supplier factory, Consumer callback); + + /** + * Remove a previously registered event handler. + * + * @param A subscription event status type. + * @param eventHandler An event handler that was registered previously in this object. + */ + void removeEventHandler( + EventHandler eventHandler); + + /** + * Get the event handlers that were registered in this Subscription. + * + * @return The registered event handlers. + */ + Collection getEventHandlers(); } diff --git a/rcljava/src/main/java/org/ros2/rcljava/subscription/SubscriptionImpl.java b/rcljava/src/main/java/org/ros2/rcljava/subscription/SubscriptionImpl.java index 47fce59a..34b8b79c 100644 --- a/rcljava/src/main/java/org/ros2/rcljava/subscription/SubscriptionImpl.java +++ b/rcljava/src/main/java/org/ros2/rcljava/subscription/SubscriptionImpl.java @@ -16,10 +16,16 @@ package org.ros2.rcljava.subscription; import java.lang.ref.WeakReference; +import java.util.Collection; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.function.Supplier; import org.ros2.rcljava.RCLJava; import org.ros2.rcljava.common.JNIUtils; import org.ros2.rcljava.consumers.Consumer; +import org.ros2.rcljava.events.EventHandler; +import org.ros2.rcljava.events.EventHandlerImpl; +import org.ros2.rcljava.events.SubscriptionEventStatus; import org.ros2.rcljava.interfaces.MessageDefinition; import org.ros2.rcljava.node.Node; @@ -68,6 +74,8 @@ public class SubscriptionImpl implements Subscripti */ private final Consumer callback; + private final Collection eventHandlers; + /** * Constructor. * @@ -90,6 +98,7 @@ public SubscriptionImpl(final WeakReference nodeReference, final long hand this.messageType = messageType; this.topic = topic; this.callback = callback; + this.eventHandlers = new LinkedBlockingQueue(); } /** @@ -113,6 +122,54 @@ public final WeakReference getNodeReference() { return this.nodeReference; } + /** + * {@inheritDoc} + */ + public final + EventHandler + createEventHandler(Supplier factory, Consumer callback) { + T status = factory.get(); + long eventHandle = nativeCreateEvent(this.handle, status.getSubscriptionEventType()); + EventHandler eventHandler = new EventHandlerImpl( + new WeakReference(this), eventHandle, factory, callback); + this.eventHandlers.add(eventHandler); + return eventHandler; + } + + /** + * {@inheritDoc} + */ + public final + void removeEventHandler( + EventHandler eventHandler) + { + if (!this.eventHandlers.remove(eventHandler)) { + throw new IllegalArgumentException( + "The passed eventHandler wasn't created by this subscription"); + } + eventHandler.dispose(); + } + + /** + * {@inheritDoc} + */ + public final + Collection getEventHandlers() { + return this.eventHandlers; + } + + /** + * Create a subscription event (rcl_event_t) + * + * The ownership of the created event handle will immediately be transferred to an + * @{link EventHandlerImpl}, that will be responsible of disposing it. + * + * @param handle A pointer to the underlying ROS 2 subscription structure. + * Must not be zero. + * @param eventType The rcl event type. + */ + private static native long nativeCreateEvent(long handle, int eventType); + /** * Destroy a ROS2 subscription (rcl_subscription_t). * @@ -127,6 +184,10 @@ public final WeakReference getNodeReference() { * {@inheritDoc} */ public final void dispose() { + for (EventHandler eventHandler : this.eventHandlers) { + eventHandler.dispose(); + } + this.eventHandlers.clear(); Node node = this.nodeReference.get(); if (node != null) { node.removeSubscription(this);