Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ JNIEXPORT void
JNICALL Java_org_ros2_rcljava_subscription_SubscriptionImpl_nativeDispose(
JNIEnv *, jclass, jlong, jlong);

/*
* Class: org_ros2_rcljava_subscription_SubscriptionImpl
* Method: nativeCreateEvent
* Signature: (JJ)J
*/
JNIEXPORT jlong
JNICALL Java_org_ros2_rcljava_subscription_SubscriptionImpl_nativeCreateEvent(
JNIEnv *, jclass, jlong, jint);

#ifdef __cplusplus
}
#endif
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <string>

#include "rcl/error_handling.h"
#include "rcl/event.h"
#include "rcl/node.h"
#include "rcl/rcl.h"
#include "rmw/rmw.h"
Expand All @@ -29,6 +30,7 @@

#include "org_ros2_rcljava_subscription_SubscriptionImpl.h"

using rcljava_common::exceptions::rcljava_throw_exception;
using rcljava_common::exceptions::rcljava_throw_rclexception;

JNIEXPORT void JNICALL
Expand Down Expand Up @@ -61,3 +63,31 @@ Java_org_ros2_rcljava_subscription_SubscriptionImpl_nativeDispose(
rcljava_throw_rclexception(env, ret, msg);
}
}

JNIEXPORT jlong
JNICALL Java_org_ros2_rcljava_subscription_SubscriptionImpl_nativeCreateEvent(
JNIEnv * env, jclass, jlong subscription_handle, jint event_type)
{
auto * subscription = reinterpret_cast<rcl_subscription_t *>(subscription_handle);
if (!subscription) {
rcljava_throw_exception(
env, "java/lang/IllegalArgumentException", "passed rcl_subscription_t handle is NULL");
return 0;
}
auto * event = static_cast<rcl_event_t *>(malloc(sizeof(rcl_event_t)));
if (!event) {
rcljava_throw_exception(env, "java/lang/OutOfMemoryError", "failed to allocate rcl_event_t");
return 0;
}
*event = rcl_get_zero_initialized_event();
rcl_ret_t ret = rcl_subscription_event_init(
event, subscription, static_cast<rcl_subscription_event_type_t>(event_type));
if (RCL_RET_OK != ret) {
std::string msg = "Failed to create event: " + std::string(rcl_get_error_string().str);
rcl_reset_error();
rcljava_throw_rclexception(env, ret, msg);
free(event);
return 0;
}
return reinterpret_cast<jlong>(event);
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ protected void waitForWork(long timeout) {
for (Subscription<MessageDefinition> subscription : node.getNode().getSubscriptions()) {
this.subscriptionHandles.add(new AbstractMap.SimpleEntry<Long, Subscription>(
subscription.getHandle(), subscription));
Collection<EventHandler> eventHandlers = subscription.getEventHandlers();
for (EventHandler eventHandler : eventHandlers) {
this.eventHandles.add(new AbstractMap.SimpleEntry<Long, EventHandler>(
eventHandler.getHandle(), eventHandler));
}
}

for (Publisher publisher : node.getNode().getPublishers()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@
package org.ros2.rcljava.subscription;

import java.lang.ref.WeakReference;
import java.util.Collection;
import java.util.function.Supplier;

import org.ros2.rcljava.consumers.Consumer;
import org.ros2.rcljava.events.EventHandler;
import org.ros2.rcljava.events.SubscriptionEventStatus;
import org.ros2.rcljava.interfaces.Disposable;
import org.ros2.rcljava.interfaces.MessageDefinition;
import org.ros2.rcljava.node.Node;
Expand All @@ -42,4 +46,30 @@ public interface Subscription<T extends MessageDefinition> extends Disposable {
WeakReference<Node> getNodeReference();

void executeCallback(T message);

/**
* Create an event handler.
*
* @param <T> A subscription event status type.
* @param factory A factory that can instantiate an event status of type T.
* @param callback Callback that will be called when the event is triggered.
*/
<T extends SubscriptionEventStatus> EventHandler<T, Subscription> createEventHandler(
Supplier<T> factory, Consumer<T> callback);

/**
* Remove a previously registered event handler.
*
* @param <T> A subscription event status type.
* @param eventHandler An event handler that was registered previously in this object.
*/
<T extends SubscriptionEventStatus> void removeEventHandler(
EventHandler<T, Subscription> eventHandler);

/**
* Get the event handlers that were registered in this Subscription.
*
* @return The registered event handlers.
*/
Collection<EventHandler> getEventHandlers();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,16 @@
package org.ros2.rcljava.subscription;

import java.lang.ref.WeakReference;
import java.util.Collection;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;

import org.ros2.rcljava.RCLJava;
import org.ros2.rcljava.common.JNIUtils;
import org.ros2.rcljava.consumers.Consumer;
import org.ros2.rcljava.events.EventHandler;
import org.ros2.rcljava.events.EventHandlerImpl;
import org.ros2.rcljava.events.SubscriptionEventStatus;
import org.ros2.rcljava.interfaces.MessageDefinition;
import org.ros2.rcljava.node.Node;

Expand Down Expand Up @@ -68,6 +74,8 @@ public class SubscriptionImpl<T extends MessageDefinition> implements Subscripti
*/
private final Consumer<T> callback;

private final Collection<EventHandler> eventHandlers;

/**
* Constructor.
*
Expand All @@ -90,6 +98,7 @@ public SubscriptionImpl(final WeakReference<Node> nodeReference, final long hand
this.messageType = messageType;
this.topic = topic;
this.callback = callback;
this.eventHandlers = new LinkedBlockingQueue<EventHandler>();
}

/**
Expand All @@ -113,6 +122,54 @@ public final WeakReference<Node> getNodeReference() {
return this.nodeReference;
}

/**
* {@inheritDoc}
*/
public final
<T extends SubscriptionEventStatus> EventHandler<T, Subscription>
createEventHandler(Supplier<T> factory, Consumer<T> callback) {
T status = factory.get();
long eventHandle = nativeCreateEvent(this.handle, status.getSubscriptionEventType());
EventHandler<T, Subscription> eventHandler = new EventHandlerImpl(
new WeakReference<Subscription>(this), eventHandle, factory, callback);
this.eventHandlers.add(eventHandler);
return eventHandler;
}

/**
* {@inheritDoc}
*/
public final
<T extends SubscriptionEventStatus> void removeEventHandler(
EventHandler<T, Subscription> eventHandler)
{
if (!this.eventHandlers.remove(eventHandler)) {
throw new IllegalArgumentException(
"The passed eventHandler wasn't created by this subscription");
}
eventHandler.dispose();
}

/**
* {@inheritDoc}
*/
public final
Collection<EventHandler> getEventHandlers() {
return this.eventHandlers;
}

/**
* Create a subscription event (rcl_event_t)
*
* The ownership of the created event handle will immediately be transferred to an
* @{link EventHandlerImpl}, that will be responsible of disposing it.
*
* @param handle A pointer to the underlying ROS 2 subscription structure.
* Must not be zero.
* @param eventType The rcl event type.
*/
private static native long nativeCreateEvent(long handle, int eventType);

/**
* Destroy a ROS2 subscription (rcl_subscription_t).
*
Expand All @@ -127,6 +184,10 @@ public final WeakReference<Node> getNodeReference() {
* {@inheritDoc}
*/
public final void dispose() {
for (EventHandler eventHandler : this.eventHandlers) {
eventHandler.dispose();
}
this.eventHandlers.clear();
Node node = this.nodeReference.get();
if (node != null) {
node.removeSubscription(this);
Expand Down