diff --git a/rclrs/src/node.rs b/rclrs/src/node.rs index dd01d060..23b3787b 100644 --- a/rclrs/src/node.rs +++ b/rclrs/src/node.rs @@ -34,7 +34,8 @@ use crate::{ IntoNodeSubscriptionCallback, LogParams, Logger, ParameterBuilder, ParameterInterface, ParameterVariant, Parameters, Promise, Publisher, PublisherOptions, PublisherState, RclrsError, Service, ServiceOptions, ServiceState, Subscription, SubscriptionOptions, SubscriptionState, - TimeSource, ToLogParams, Worker, WorkerOptions, WorkerState, ENTITY_LIFECYCLE_MUTEX, + TakerSubscription, TimeSource, ToLogParams, Worker, WorkerOptions, WorkerState, + ENTITY_LIFECYCLE_MUTEX, }; /// A processing unit that can communicate with other nodes. See the API of @@ -785,6 +786,85 @@ impl NodeState { ) } + /// Creates a [`TakerSubscription`]. + /// + /// # Behavior + /// + /// This subscription uses no callback and calling [`spin`][1] on the + /// node's executor will have no effect, nor is it required to receive + /// messages. + /// + /// In order to receive messages, use [`take`][2] or one of its variants. + /// + /// ```no_run + /// # use rclrs::*; + /// # let executor = Context::default().create_basic_executor(); + /// # let node = executor.create_node("my_node").unwrap(); + /// use example_interfaces::msg::String as StringMsg; + /// + /// let subscription = + /// node.create_taker_subscription::("topic".keep_last(1))?; + /// + /// loop { + /// if let Some(msg) = subscription.take()? { + /// println!("{}", msg.data); + /// } + /// std::thread::sleep(std::time::Duration::from_millis(100)); + /// } + /// # Ok::<(), RclrsError>(()) + /// ``` + /// + /// [TakerSubscription]s can also be used in a [`WaitSet`][3] to wait for + /// messages from one or more subscriptions. + /// + /// ```no_run + /// # use rclrs::*; + /// # let context = Context::default(); + /// # let executor = context.create_basic_executor(); + /// # let node = executor.create_node("my_node").unwrap(); + /// use std::sync::Arc; + /// use example_interfaces::msg::String as StringMsg; + /// + /// let subscription = + /// Arc::new(node.create_taker_subscription::("topic")?); + /// + /// // `_lifecycle` must be named to avoid being dropped, which would cause + /// // the waitable to be dropped from the WaitSet. + /// let (waitable, _lifecycle) = + /// Waitable::new(Box::new(Arc::clone(&subscription)), None); + /// + /// let mut waitset = WaitSet::new(&context)?; + /// waitset.add([waitable])?; + /// + /// loop { + /// waitset.wait(None, |_| Ok(()))?; + /// + /// if let Some(msg) = subscription.take()? { + /// println!("{}", msg.data); + /// } + /// } + /// # Ok::<(), RclrsError>(()) + /// ``` + /// + /// # Subscription Options + /// + /// See [`create_subscription`][4] for examples + /// of setting the subscription options. + /// + /// [1]: crate::Executor::spin + /// [2]: crate::TakerSubscription::take + /// [3]: crate::WaitSet + /// [4]: crate::NodeState::create_subscription + pub fn create_taker_subscription<'a, T>( + &self, + options: impl Into>, + ) -> Result, RclrsError> + where + T: Message, + { + TakerSubscription::create(options, &self.handle) + } + /// Creates a [`Subscription`] with an async callback. /// /// # Behavior diff --git a/rclrs/src/subscription.rs b/rclrs/src/subscription.rs index e57c542f..2c13d102 100644 --- a/rclrs/src/subscription.rs +++ b/rclrs/src/subscription.rs @@ -1,6 +1,7 @@ use std::{ any::Any, ffi::{CStr, CString}, + marker::PhantomData, sync::{Arc, Mutex, MutexGuard}, }; @@ -8,8 +9,8 @@ use rosidl_runtime_rs::{Message, RmwMessage}; use crate::{ error::ToResult, qos::QoSProfile, rcl_bindings::*, IntoPrimitiveOptions, Node, NodeHandle, - RclPrimitive, RclPrimitiveHandle, RclPrimitiveKind, RclrsError, Waitable, WaitableLifecycle, - WorkScope, Worker, WorkerCommands, ENTITY_LIFECYCLE_MUTEX, + RclPrimitive, RclPrimitiveHandle, RclPrimitiveKind, RclReturnCode, RclrsError, Waitable, + WaitableLifecycle, WorkScope, Worker, WorkerCommands, ENTITY_LIFECYCLE_MUTEX, }; mod any_subscription_callback; @@ -117,47 +118,9 @@ where node_handle: &Arc, commands: &Arc, ) -> Result, RclrsError> { - let SubscriptionOptions { topic, qos } = options.into(); let callback = Arc::new(Mutex::new(callback)); - // SAFETY: Getting a zero-initialized value is always safe. - let mut rcl_subscription = unsafe { rcl_get_zero_initialized_subscription() }; - let type_support = - ::RmwMsg::get_type_support() as *const rosidl_message_type_support_t; - let topic_c_string = CString::new(topic).map_err(|err| RclrsError::StringContainsNul { - err, - s: topic.into(), - })?; - - // SAFETY: No preconditions for this function. - let mut rcl_subscription_options = unsafe { rcl_subscription_get_default_options() }; - rcl_subscription_options.qos = qos.into(); - - { - let rcl_node = node_handle.rcl_node.lock().unwrap(); - let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap(); - unsafe { - // SAFETY: - // * The rcl_subscription is zero-initialized as mandated by this function. - // * The rcl_node is kept alive by the NodeHandle because it is a dependency of the subscription. - // * The topic name and the options are copied by this function, so they can be dropped afterwards. - // * The entity lifecycle mutex is locked to protect against the risk of global - // variables in the rmw implementation being unsafely modified during cleanup. - rcl_subscription_init( - &mut rcl_subscription, - &*rcl_node, - type_support, - topic_c_string.as_ptr(), - &rcl_subscription_options, - ) - .ok()?; - } - } - - let handle = Arc::new(SubscriptionHandle { - rcl_subscription: Mutex::new(rcl_subscription), - node_handle: Arc::clone(node_handle), - }); + let handle = SubscriptionHandle::create::(options, node_handle)?; let (waitable, lifecycle) = Waitable::new( Box::new(SubscriptionExecutable { @@ -292,6 +255,52 @@ struct SubscriptionHandle { } impl SubscriptionHandle { + fn create<'a, T: Message>( + options: impl Into>, + node_handle: &Arc, + ) -> Result, RclrsError> { + let SubscriptionOptions { topic, qos } = options.into(); + + // SAFETY: Getting a zero-initialized value is always safe. + let mut rcl_subscription = unsafe { rcl_get_zero_initialized_subscription() }; + let type_support = + ::RmwMsg::get_type_support() as *const rosidl_message_type_support_t; + let topic_c_string = CString::new(topic).map_err(|err| RclrsError::StringContainsNul { + err, + s: topic.into(), + })?; + + // SAFETY: No preconditions for this function. + let mut rcl_subscription_options = unsafe { rcl_subscription_get_default_options() }; + rcl_subscription_options.qos = qos.into(); + + { + let rcl_node = node_handle.rcl_node.lock().unwrap(); + let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap(); + unsafe { + // SAFETY: + // * The rcl_subscription is zero-initialized as mandated by this function. + // * The rcl_node is kept alive by the NodeHandle because it is a dependency of the subscription. + // * The topic name and the options are copied by this function, so they can be dropped afterwards. + // * The entity lifecycle mutex is locked to protect against the risk of global + // variables in the rmw implementation being unsafely modified during cleanup. + rcl_subscription_init( + &mut rcl_subscription, + &*rcl_node, + type_support, + topic_c_string.as_ptr(), + &rcl_subscription_options, + ) + .ok()?; + } + } + + Ok(Arc::new(Self { + rcl_subscription: Mutex::new(rcl_subscription), + node_handle: Arc::clone(node_handle), + })) + } + fn lock(&self) -> MutexGuard { self.rcl_subscription.lock().unwrap() } @@ -408,6 +417,109 @@ impl Drop for SubscriptionHandle { } } +/// Struct for receiving messages of type `T` without a callback. +/// +/// Create a subscription using [`NodeState::create_taker_subscription()`][1]. +/// +/// Calling [`spin`][2] on the node's executor will have no effect on this subscription. +/// +/// When a subscription is created, it may take some time to get "matched" with a corresponding +/// publisher. +/// +/// [1]: crate::NodeState::create_taker_subscription +/// [2]: crate::Executor::spin +pub struct TakerSubscription { + handle: Arc, + _phantom: PhantomData, +} + +impl TakerSubscription { + /// Used by [`Node`][crate::Node] to create a new taker subscription. + pub(crate) fn create<'a>( + options: impl Into>, + node_handle: &Arc, + ) -> Result { + let handle = SubscriptionHandle::create::(options, node_handle)?; + + Ok(Self { + handle, + _phantom: PhantomData, + }) + } + + /// Fetches a new message. + /// + /// When there is no new message, this will return `Ok(None)`. + pub fn take(&self) -> Result, RclrsError> { + self.take_with_info().map(|res| res.map(|msg| msg.0)) + } + + /// Fetches a new message and its associated [`MessageInfo`][1]. + /// + /// When there is no new message, this will return `Ok(None)`. + /// + /// [1]: crate::MessageInfo + pub fn take_with_info(&self) -> Result, RclrsError> { + match self.handle.take() { + Ok(msg) => Ok(Some(msg)), + Err(RclrsError::RclError { + code: RclReturnCode::SubscriptionTakeFailed, + .. + }) => Ok(None), + Err(e) => Err(e), + } + } + + /// Obtains a read-only handle to a message owned by the middleware. + /// + /// When there is no new message, this will return `Ok(None)`. + /// + /// This is the counterpart to [`Publisher::borrow_loaned_message()`][1]. See its documentation + /// for more information. + /// + /// [1]: crate::Publisher::borrow_loaned_message + pub fn take_loaned(&self) -> Result>, RclrsError> { + self.take_loaned_with_info().map(|res| res.map(|msg| msg.0)) + } + + /// Obtains a read-only handle to a message owned by the middleware and its associated + /// [`MessageInfo`][1]. + /// + /// When there is no new message, this will return `Ok(None)`. + /// + /// This is the counterpart to [`Publisher::borrow_loaned_message()`][2]. See its documentation + /// for more information. + /// + /// [1]: crate::MessageInfo + /// [2]: crate::Publisher::borrow_loaned_message + pub fn take_loaned_with_info( + &self, + ) -> Result, MessageInfo)>, RclrsError> { + match self.handle.take_loaned() { + Ok(msg) => Ok(Some(msg)), + Err(RclrsError::RclError { + code: RclReturnCode::SubscriptionTakeFailed, + .. + }) => Ok(None), + Err(e) => Err(e), + } + } +} + +impl RclPrimitive for Arc> { + unsafe fn execute(&mut self, _payload: &mut dyn Any) -> Result<(), RclrsError> { + Ok(()) + } + + fn kind(&self) -> RclPrimitiveKind { + RclPrimitiveKind::Subscription + } + + fn handle(&self) -> RclPrimitiveHandle { + RclPrimitiveHandle::Subscription(self.handle.lock()) + } +} + #[cfg(test)] mod tests { use super::*; @@ -520,4 +632,44 @@ mod tests { assert!(start_time.elapsed() < std::time::Duration::from_secs(10)); } } + + #[test] + fn test_taker_subscription() -> Result<(), RclrsError> { + use crate::*; + use std::time::Duration; + + let context = Context::default(); + let executor = context.create_basic_executor(); + let node = executor.create_node("test_node_taker_subscription")?; + + let publisher = node.create_publisher::("test_topic")?; + let subscriber = Arc::new(node.create_taker_subscription::("test_topic")?); + + let (waitable, _lifecycle) = Waitable::new(Box::new(Arc::clone(&subscriber)), None); + let mut waitset = WaitSet::new(&context)?; + waitset.add([waitable])?; + + let timeout = Some(Duration::from_millis(100)); + + publisher.publish(msg::Empty::default())?; + waitset.wait(timeout, |_| Ok(()))?; + + assert!(subscriber.take()?.is_some()); + + assert!(subscriber.take()?.is_none()); + assert!(subscriber.take_with_info()?.is_none()); + + publisher.publish(msg::Empty::default())?; + waitset.wait(timeout, |_| Ok(()))?; + + assert!(subscriber.take_with_info()?.is_some()); + + assert!(subscriber.take()?.is_none()); + assert!(subscriber.take_with_info()?.is_none()); + + publisher.publish(msg::Empty::default())?; + waitset.wait(timeout, |_| Ok(()))?; + + Ok(()) + } }