Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions rcljava/include/org_ros2_rcljava_executors_BaseExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,15 @@ JNIEXPORT void
JNICALL Java_org_ros2_rcljava_executors_BaseExecutor_nativeWaitSetAddTimer(
JNIEnv *, jclass, jlong, jlong);

/*
* Class: org_ros2_rcljava_executors_BaseExecutor
* Method: nativeWaitSetAddEvent
* Signature: (JJ)V
*/
JNIEXPORT void
JNICALL Java_org_ros2_rcljava_executors_BaseExecutor_nativeWaitSetAddEvent(
JNIEnv *, jclass, jlong, jlong);

/*
* Class: org_ros2_rcljava_executors_BaseExecutor
* Method: nativeWaitSetSubscriptionIsReady
Expand All @@ -152,6 +161,15 @@ JNIEXPORT jboolean
JNICALL Java_org_ros2_rcljava_executors_BaseExecutor_nativeWaitSetTimerIsReady(
JNIEnv *, jclass, jlong, jlong);

/*
* Class: org_ros2_rcljava_executors_BaseExecutor
* Method: nativeWaitSetEventIsReady
* Signature: (JJ)Z
*/
JNIEXPORT jboolean
JNICALL Java_org_ros2_rcljava_executors_BaseExecutor_nativeWaitSetEventIsReady(
JNIEnv *, jclass, jlong, jlong);

/*
* Class: org_ros2_rcljava_executors_BaseExecutor
* Method: nativeWaitSetServiceIsReady
Expand Down
23 changes: 23 additions & 0 deletions rcljava/src/main/cpp/org_ros2_rcljava_executors_BaseExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,21 @@ Java_org_ros2_rcljava_executors_BaseExecutor_nativeWaitSetAddTimer(
}
}

JNIEXPORT void JNICALL
Java_org_ros2_rcljava_executors_BaseExecutor_nativeWaitSetAddEvent(
JNIEnv * env, jclass, jlong wait_set_handle, jlong event_handle)
{
auto * wait_set = reinterpret_cast<rcl_wait_set_t *>(wait_set_handle);
auto * event = reinterpret_cast<rcl_event_t *>(event_handle);
rcl_ret_t ret = rcl_wait_set_add_event(wait_set, event, nullptr);
if (ret != RCL_RET_OK) {
std::string msg =
"Failed to add event to wait set: " + std::string(rcl_get_error_string().str);
rcl_reset_error();
rcljava_throw_rclexception(env, ret, msg);
}
}

JNIEXPORT jobject JNICALL
Java_org_ros2_rcljava_executors_BaseExecutor_nativeTakeRequest(
JNIEnv * env, jclass, jlong service_handle, jlong jrequest_from_java_converter_handle,
Expand Down Expand Up @@ -435,6 +450,14 @@ Java_org_ros2_rcljava_executors_BaseExecutor_nativeWaitSetTimerIsReady(
return wait_set->timers[index] != nullptr;
}

JNIEXPORT jboolean JNICALL
Java_org_ros2_rcljava_executors_BaseExecutor_nativeWaitSetEventIsReady(
JNIEnv *, jclass, jlong wait_set_handle, jlong index)
{
rcl_wait_set_t * wait_set = reinterpret_cast<rcl_wait_set_t *>(wait_set_handle);
return wait_set->events[index] != nullptr;
}

JNIEXPORT jboolean JNICALL
Java_org_ros2_rcljava_executors_BaseExecutor_nativeWaitSetServiceIsReady(
JNIEnv *, jclass, jlong wait_set_handle, jlong index)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.ros2.rcljava.executors;

import org.ros2.rcljava.client.Client;
import org.ros2.rcljava.events.EventHandler;
import org.ros2.rcljava.subscription.Subscription;
import org.ros2.rcljava.service.Service;
import org.ros2.rcljava.timer.Timer;
Expand All @@ -25,4 +26,5 @@ public class AnyExecutable {
public Subscription subscription;
public Service service;
public Client client;
public EventHandler eventHandler;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.Map;

import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
Expand All @@ -31,11 +32,13 @@
import org.ros2.rcljava.RCLJava;
import org.ros2.rcljava.client.Client;
import org.ros2.rcljava.common.JNIUtils;
import org.ros2.rcljava.events.EventHandler;
import org.ros2.rcljava.executors.AnyExecutable;
import org.ros2.rcljava.executors.Executor;
import org.ros2.rcljava.interfaces.MessageDefinition;
import org.ros2.rcljava.interfaces.ServiceDefinition;
import org.ros2.rcljava.node.ComposableNode;
import org.ros2.rcljava.publisher.Publisher;
import org.ros2.rcljava.service.RMWRequestId;
import org.ros2.rcljava.service.Service;
import org.ros2.rcljava.subscription.Subscription;
Expand Down Expand Up @@ -64,6 +67,8 @@ public class BaseExecutor {

private List<Map.Entry<Long, Client>> clientHandles = new ArrayList<Map.Entry<Long, Client>>();

private List<Map.Entry<Long, EventHandler>> eventHandles = new ArrayList<Map.Entry<Long, EventHandler>>();

protected void addNode(ComposableNode node) {
this.nodes.add(node);
}
Expand Down Expand Up @@ -158,20 +163,34 @@ protected void executeAnyExecutable(AnyExecutable anyExecutable) {
}
clientHandles.remove(anyExecutable.client.getHandle());
}

if (anyExecutable.eventHandler != null) {
anyExecutable.eventHandler.executeCallback();
eventHandles.remove(anyExecutable.eventHandler.getHandle());
}
}

protected void waitForWork(long timeout) {
this.subscriptionHandles.clear();
this.timerHandles.clear();
this.serviceHandles.clear();
this.clientHandles.clear();
this.eventHandles.clear();

for (ComposableNode node : this.nodes) {
for (Subscription<MessageDefinition> subscription : node.getNode().getSubscriptions()) {
this.subscriptionHandles.add(new AbstractMap.SimpleEntry<Long, Subscription>(
subscription.getHandle(), subscription));
}

for (Publisher publisher : node.getNode().getPublishers()) {
Collection<EventHandler> eventHandlers = publisher.getEventHandlers();
for (EventHandler eventHandler : eventHandlers) {
this.eventHandles.add(new AbstractMap.SimpleEntry<Long, EventHandler>(
eventHandler.getHandle(), eventHandler));
}
}

for (Timer timer : node.getNode().getTimers()) {
this.timerHandles.add(new AbstractMap.SimpleEntry<Long, Timer>(timer.getHandle(), timer));
}
Expand All @@ -191,6 +210,7 @@ protected void waitForWork(long timeout) {
int timersSize = 0;
int clientsSize = 0;
int servicesSize = 0;
int eventsSize = this.eventHandles.size();

for (ComposableNode node : this.nodes) {
subscriptionsSize += node.getNode().getSubscriptions().size();
Expand All @@ -205,7 +225,9 @@ protected void waitForWork(long timeout) {

long waitSetHandle = nativeGetZeroInitializedWaitSet();
long contextHandle = RCLJava.getDefaultContext().getHandle();
nativeWaitSetInit(waitSetHandle, contextHandle, subscriptionsSize, 0, timersSize, clientsSize, servicesSize, 0);
nativeWaitSetInit(
waitSetHandle, contextHandle, subscriptionsSize, 0,
timersSize, clientsSize, servicesSize, eventsSize);

nativeWaitSetClear(waitSetHandle);

Expand All @@ -225,6 +247,10 @@ protected void waitForWork(long timeout) {
nativeWaitSetAddClient(waitSetHandle, entry.getKey());
}

for (Map.Entry<Long, EventHandler> entry : this.eventHandles) {
nativeWaitSetAddEvent(waitSetHandle, entry.getKey());
}

nativeWait(waitSetHandle, timeout);

for (int i = 0; i < this.subscriptionHandles.size(); ++i) {
Expand All @@ -251,6 +277,12 @@ protected void waitForWork(long timeout) {
}
}

for (int i = 0; i < this.eventHandles.size(); ++i) {
if (!nativeWaitSetEventIsReady(waitSetHandle, i)) {
this.eventHandles.get(i).setValue(null);
}
}

Iterator<Map.Entry<Long, Subscription>> subscriptionIterator =
this.subscriptionHandles.iterator();
while (subscriptionIterator.hasNext()) {
Expand Down Expand Up @@ -284,6 +316,14 @@ protected void waitForWork(long timeout) {
}
}

Iterator<Map.Entry<Long, EventHandler>> eventIterator = this.eventHandles.iterator();
while (eventIterator.hasNext()) {
Map.Entry<Long, EventHandler> entry = eventIterator.next();
if (entry.getValue() == null) {
eventIterator.remove();
}
}

nativeDisposeWaitSet(waitSetHandle);
}

Expand Down Expand Up @@ -325,6 +365,14 @@ protected AnyExecutable getNextExecutable() {
}
}

for (Map.Entry<Long, EventHandler> entry : this.eventHandles) {
if (entry.getValue() != null) {
anyExecutable.eventHandler = entry.getValue();
entry.setValue(null);
return anyExecutable;
}
}

return null;
}

Expand Down Expand Up @@ -405,6 +453,8 @@ private static native MessageDefinition nativeTake(

private static native void nativeWaitSetAddTimer(long waitSetHandle, long timerHandle);

private static native void nativeWaitSetAddEvent(long waitSetHandle, long eventHandle);

private static native RMWRequestId nativeTakeRequest(long serviceHandle,
long requestFromJavaConverterHandle, long requestToJavaConverterHandle,
long requestDestructorHandle, MessageDefinition requestMessage);
Expand All @@ -421,6 +471,8 @@ private static native RMWRequestId nativeTakeResponse(long clientHandle,

private static native boolean nativeWaitSetTimerIsReady(long waitSetHandle, long index);

private static native boolean nativeWaitSetEventIsReady(long waitSetHandle, long index);

private static native boolean nativeWaitSetServiceIsReady(long waitSetHandle, long index);

private static native boolean nativeWaitSetClientIsReady(long waitSetHandle, long index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.ros2.rcljava.publisher;

import java.lang.ref.WeakReference;
import java.util.Collection;
import java.util.function.Supplier;

import org.ros2.rcljava.consumers.Consumer;
Expand Down Expand Up @@ -64,4 +65,11 @@ <T extends PublisherEventStatus> EventHandler<T, Publisher> createEventHandler(
*/
<T extends PublisherEventStatus> void removeEventHandler(
EventHandler<T, Publisher> eventHandler);

/**
* Get the event handlers that were registered in this Publisher.
*
* @return The registered event handlers.
*/
Collection<EventHandler> getEventHandlers();
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,15 @@ <T extends PublisherEventStatus> void removeEventHandler(
}

/**
* Create a publisher event (rcl_event_t).
* {@inheritDoc}
*/
public final
Collection<EventHandler> getEventHandlers() {
return this.eventHandlers;
}

/**
* Create a publisher event (rcl_event_t)
*
* The ownership of the created event handle will immediately be transferred to an
* @{link EventHandlerImpl}, that will be responsible of disposing it.
Expand Down