Skip to content

Commit a2958e3

Browse files
committed
Add support for events in subscriptions (#13)
Signed-off-by: Ivan Santiago Paunovic <[email protected]>
1 parent 163f66b commit a2958e3

File tree

5 files changed

+135
-0
lines changed

5 files changed

+135
-0
lines changed

rcljava/include/org_ros2_rcljava_subscription_SubscriptionImpl.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,15 @@ JNIEXPORT void
2929
JNICALL Java_org_ros2_rcljava_subscription_SubscriptionImpl_nativeDispose(
3030
JNIEnv *, jclass, jlong, jlong);
3131

32+
/*
33+
* Class: org_ros2_rcljava_subscription_SubscriptionImpl
34+
* Method: nativeCreateEvent
35+
* Signature: (JJ)J
36+
*/
37+
JNIEXPORT jlong
38+
JNICALL Java_org_ros2_rcljava_subscription_SubscriptionImpl_nativeCreateEvent(
39+
JNIEnv *, jclass, jlong, jint);
40+
3241
#ifdef __cplusplus
3342
}
3443
#endif

rcljava/src/main/cpp/org_ros2_rcljava_subscription_SubscriptionImpl.cpp

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <string>
2020

2121
#include "rcl/error_handling.h"
22+
#include "rcl/event.h"
2223
#include "rcl/node.h"
2324
#include "rcl/rcl.h"
2425
#include "rmw/rmw.h"
@@ -29,6 +30,7 @@
2930

3031
#include "org_ros2_rcljava_subscription_SubscriptionImpl.h"
3132

33+
using rcljava_common::exceptions::rcljava_throw_exception;
3234
using rcljava_common::exceptions::rcljava_throw_rclexception;
3335

3436
JNIEXPORT void JNICALL
@@ -61,3 +63,31 @@ Java_org_ros2_rcljava_subscription_SubscriptionImpl_nativeDispose(
6163
rcljava_throw_rclexception(env, ret, msg);
6264
}
6365
}
66+
67+
JNIEXPORT jlong
68+
JNICALL Java_org_ros2_rcljava_subscription_SubscriptionImpl_nativeCreateEvent(
69+
JNIEnv * env, jclass, jlong subscription_handle, jint event_type)
70+
{
71+
auto * subscription = reinterpret_cast<rcl_subscription_t *>(subscription_handle);
72+
if (!subscription) {
73+
rcljava_throw_exception(
74+
env, "java/lang/IllegalArgumentException", "passed rcl_subscription_t handle is NULL");
75+
return 0;
76+
}
77+
auto * event = static_cast<rcl_event_t *>(malloc(sizeof(rcl_event_t)));
78+
if (!event) {
79+
rcljava_throw_exception(env, "java/lang/OutOfMemoryError", "failed to allocate rcl_event_t");
80+
return 0;
81+
}
82+
*event = rcl_get_zero_initialized_event();
83+
rcl_ret_t ret = rcl_subscription_event_init(
84+
event, subscription, static_cast<rcl_subscription_event_type_t>(event_type));
85+
if (RCL_RET_OK != ret) {
86+
std::string msg = "Failed to create event: " + std::string(rcl_get_error_string().str);
87+
rcl_reset_error();
88+
rcljava_throw_rclexception(env, ret, msg);
89+
free(event);
90+
return 0;
91+
}
92+
return reinterpret_cast<jlong>(event);
93+
}

rcljava/src/main/java/org/ros2/rcljava/executors/BaseExecutor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,11 @@ protected void waitForWork(long timeout) {
181181
for (Subscription<MessageDefinition> subscription : node.getNode().getSubscriptions()) {
182182
this.subscriptionHandles.add(new AbstractMap.SimpleEntry<Long, Subscription>(
183183
subscription.getHandle(), subscription));
184+
Collection<EventHandler> eventHandlers = subscription.getEventHandlers();
185+
for (EventHandler eventHandler : eventHandlers) {
186+
this.eventHandles.add(new AbstractMap.SimpleEntry<Long, EventHandler>(
187+
eventHandler.getHandle(), eventHandler));
188+
}
184189
}
185190

186191
for (Publisher publisher : node.getNode().getPublishers()) {

rcljava/src/main/java/org/ros2/rcljava/subscription/Subscription.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,12 @@
1616
package org.ros2.rcljava.subscription;
1717

1818
import java.lang.ref.WeakReference;
19+
import java.util.Collection;
20+
import java.util.function.Supplier;
1921

2022
import org.ros2.rcljava.consumers.Consumer;
23+
import org.ros2.rcljava.events.EventHandler;
24+
import org.ros2.rcljava.events.SubscriptionEventStatus;
2125
import org.ros2.rcljava.interfaces.Disposable;
2226
import org.ros2.rcljava.interfaces.MessageDefinition;
2327
import org.ros2.rcljava.node.Node;
@@ -42,4 +46,30 @@ public interface Subscription<T extends MessageDefinition> extends Disposable {
4246
WeakReference<Node> getNodeReference();
4347

4448
void executeCallback(T message);
49+
50+
/**
51+
* Create an event handler.
52+
*
53+
* @param <T> A subscription event status type.
54+
* @param factory A factory that can instantiate an event status of type T.
55+
* @param callback Callback that will be called when the event is triggered.
56+
*/
57+
<T extends SubscriptionEventStatus> EventHandler<T, Subscription> createEventHandler(
58+
Supplier<T> factory, Consumer<T> callback);
59+
60+
/**
61+
* Remove a previously registered event handler.
62+
*
63+
* @param <T> A subscription event status type.
64+
* @param eventHandler An event handler that was registered previously in this object.
65+
*/
66+
<T extends SubscriptionEventStatus> void removeEventHandler(
67+
EventHandler<T, Subscription> eventHandler);
68+
69+
/**
70+
* Get the event handlers that were registered in this Subscription.
71+
*
72+
* @return The registered event handlers.
73+
*/
74+
Collection<EventHandler> getEventHandlers();
4575
}

rcljava/src/main/java/org/ros2/rcljava/subscription/SubscriptionImpl.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,16 @@
1616
package org.ros2.rcljava.subscription;
1717

1818
import java.lang.ref.WeakReference;
19+
import java.util.Collection;
20+
import java.util.concurrent.LinkedBlockingQueue;
21+
import java.util.function.Supplier;
1922

2023
import org.ros2.rcljava.RCLJava;
2124
import org.ros2.rcljava.common.JNIUtils;
2225
import org.ros2.rcljava.consumers.Consumer;
26+
import org.ros2.rcljava.events.EventHandler;
27+
import org.ros2.rcljava.events.EventHandlerImpl;
28+
import org.ros2.rcljava.events.SubscriptionEventStatus;
2329
import org.ros2.rcljava.interfaces.MessageDefinition;
2430
import org.ros2.rcljava.node.Node;
2531

@@ -68,6 +74,8 @@ public class SubscriptionImpl<T extends MessageDefinition> implements Subscripti
6874
*/
6975
private final Consumer<T> callback;
7076

77+
private final Collection<EventHandler> eventHandlers;
78+
7179
/**
7280
* Constructor.
7381
*
@@ -90,6 +98,7 @@ public SubscriptionImpl(final WeakReference<Node> nodeReference, final long hand
9098
this.messageType = messageType;
9199
this.topic = topic;
92100
this.callback = callback;
101+
this.eventHandlers = new LinkedBlockingQueue<EventHandler>();
93102
}
94103

95104
/**
@@ -113,6 +122,54 @@ public final WeakReference<Node> getNodeReference() {
113122
return this.nodeReference;
114123
}
115124

125+
/**
126+
* {@inheritDoc}
127+
*/
128+
public final
129+
<T extends SubscriptionEventStatus> EventHandler<T, Subscription>
130+
createEventHandler(Supplier<T> factory, Consumer<T> callback) {
131+
T status = factory.get();
132+
long eventHandle = nativeCreateEvent(this.handle, status.getSubscriptionEventType());
133+
EventHandler<T, Subscription> eventHandler = new EventHandlerImpl(
134+
new WeakReference<Subscription>(this), eventHandle, factory, callback);
135+
this.eventHandlers.add(eventHandler);
136+
return eventHandler;
137+
}
138+
139+
/**
140+
* {@inheritDoc}
141+
*/
142+
public final
143+
<T extends SubscriptionEventStatus> void removeEventHandler(
144+
EventHandler<T, Subscription> eventHandler)
145+
{
146+
if (!this.eventHandlers.remove(eventHandler)) {
147+
throw new IllegalArgumentException(
148+
"The passed eventHandler wasn't created by this subscription");
149+
}
150+
eventHandler.dispose();
151+
}
152+
153+
/**
154+
* {@inheritDoc}
155+
*/
156+
public final
157+
Collection<EventHandler> getEventHandlers() {
158+
return this.eventHandlers;
159+
}
160+
161+
/**
162+
* Create a subscription event (rcl_event_t)
163+
*
164+
* The ownership of the created event handle will immediately be transferred to an
165+
* @{link EventHandlerImpl}, that will be responsible of disposing it.
166+
*
167+
* @param handle A pointer to the underlying ROS 2 subscription structure.
168+
* Must not be zero.
169+
* @param eventType The rcl event type.
170+
*/
171+
private static native long nativeCreateEvent(long handle, int eventType);
172+
116173
/**
117174
* Destroy a ROS2 subscription (rcl_subscription_t).
118175
*
@@ -127,6 +184,10 @@ public final WeakReference<Node> getNodeReference() {
127184
* {@inheritDoc}
128185
*/
129186
public final void dispose() {
187+
for (EventHandler eventHandler : this.eventHandlers) {
188+
eventHandler.dispose();
189+
}
190+
this.eventHandlers.clear();
130191
Node node = this.nodeReference.get();
131192
if (node != null) {
132193
node.removeSubscription(this);

0 commit comments

Comments
 (0)