diff --git a/rclrs/src/lib.rs b/rclrs/src/lib.rs index af0429247..497bddf28 100644 --- a/rclrs/src/lib.rs +++ b/rclrs/src/lib.rs @@ -53,16 +53,16 @@ pub fn spin_once(node: &Node, timeout: Option) -> Result<(), RclrsErro &ctx, )?; - for live_subscription in &live_subscriptions { - wait_set.add_subscription(live_subscription.clone())?; + for live_subscription in live_subscriptions { + wait_set.add_subscription(live_subscription)?; } - for live_client in &live_clients { - wait_set.add_client(live_client.clone())?; + for live_client in live_clients { + wait_set.add_client(live_client)?; } - for live_service in &live_services { - wait_set.add_service(live_service.clone())?; + for live_service in live_services { + wait_set.add_service(live_service)?; } let ready_entities = wait_set.wait(timeout)?; diff --git a/rclrs/src/node.rs b/rclrs/src/node.rs index 39dc23c1b..c3d554a6c 100644 --- a/rclrs/src/node.rs +++ b/rclrs/src/node.rs @@ -72,9 +72,9 @@ unsafe impl Send for rcl_node_t {} pub struct Node { rcl_node_mtx: Arc>, pub(crate) rcl_context_mtx: Arc>, - pub(crate) clients: Vec>, - pub(crate) services: Vec>, - pub(crate) subscriptions: Vec>, + pub(crate) clients: Vec>, + pub(crate) services: Vec>, + pub(crate) subscriptions: Vec>, _parameter_map: ParameterOverrideMap, } @@ -193,7 +193,7 @@ impl Node { { let client = Arc::new(crate::node::client::Client::::new(self, topic)?); self.clients - .push(Arc::downgrade(&client) as Weak); + .push(Arc::downgrade(&client) as Weak); Ok(client) } @@ -229,7 +229,7 @@ impl Node { self, topic, callback, )?); self.services - .push(Arc::downgrade(&service) as Weak); + .push(Arc::downgrade(&service) as Weak); Ok(service) } @@ -249,23 +249,23 @@ impl Node { { let subscription = Arc::new(Subscription::::new(self, topic, qos, callback)?); self.subscriptions - .push(Arc::downgrade(&subscription) as Weak); + .push(Arc::downgrade(&subscription) as Weak); Ok(subscription) } /// Returns the subscriptions that have not been dropped yet. - pub(crate) fn live_subscriptions(&self) -> Vec> { + pub(crate) fn live_subscriptions(&self) -> Vec> { self.subscriptions .iter() .filter_map(Weak::upgrade) .collect() } - pub(crate) fn live_clients(&self) -> Vec> { + pub(crate) fn live_clients(&self) -> Vec> { self.clients.iter().filter_map(Weak::upgrade).collect() } - pub(crate) fn live_services(&self) -> Vec> { + pub(crate) fn live_services(&self) -> Vec> { self.services.iter().filter_map(Weak::upgrade).collect() } diff --git a/rclrs/src/node/client.rs b/rclrs/src/node/client.rs index 0a576d3be..a76c1b178 100644 --- a/rclrs/src/node/client.rs +++ b/rclrs/src/node/client.rs @@ -1,77 +1,94 @@ -use crate::node::client::oneshot::Canceled; use futures::channel::oneshot; use std::boxed::Box; use std::collections::HashMap; use std::ffi::CString; use std::sync::Arc; -use crate::error::{RclReturnCode, ToResult}; -use crate::MessageCow; -use crate::Node; -use crate::{rcl_bindings::*, RclrsError}; +use crate::rcl_bindings::*; +use crate::{MessageCow, Node, RclReturnCode, RclrsError, ToResult, WaitSet, Waitable}; -use parking_lot::{Mutex, MutexGuard}; +use parking_lot::Mutex; use rosidl_runtime_rs::Message; // SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread // they are running in. Therefore, this type can be safely sent to another thread. unsafe impl Send for rcl_client_t {} -/// Internal struct used by clients. -pub struct ClientHandle { +type RequestValue = Box; + +type RequestId = i64; + +/// Main class responsible for sending requests to a ROS service. +pub struct Client +where + T: rosidl_runtime_rs::Service, +{ rcl_client_mtx: Mutex, rcl_node_mtx: Arc>, + requests: Mutex>>, + futures: Arc>>>, } -impl ClientHandle { - pub(crate) fn lock(&self) -> MutexGuard { - self.rcl_client_mtx.lock() - } -} - -impl Drop for ClientHandle { +impl Drop for Client +where + T: rosidl_runtime_rs::Service, +{ fn drop(&mut self) { - let handle = self.rcl_client_mtx.get_mut(); - let rcl_node_mtx = &mut *self.rcl_node_mtx.lock(); + let rcl_client = self.rcl_client_mtx.get_mut(); + let rcl_node = &mut *self.rcl_node_mtx.lock(); // SAFETY: No preconditions for this function unsafe { - rcl_client_fini(handle, rcl_node_mtx); + rcl_client_fini(rcl_client, rcl_node); } } } -impl From for RclrsError { - fn from(_: Canceled) -> Self { - RclrsError::RclError { - code: RclReturnCode::Error, - msg: None, - } +impl Waitable for Client +where + T: rosidl_runtime_rs::Service, +{ + unsafe fn add_to_wait_set(self: Arc, wait_set: &mut WaitSet) -> Result<(), RclrsError> { + // SAFETY: I'm not sure if it's required, but the client pointer will remain valid + // for as long as the wait set exists, because it's stored in self.clients. + // Passing in a null pointer for the third argument is explicitly allowed. + rcl_wait_set_add_client( + &mut wait_set.rcl_wait_set, + &*self.rcl_client_mtx.lock(), + std::ptr::null_mut(), + ) + .ok()?; + wait_set.clients.push(self); + Ok(()) } -} -/// Trait to be implemented by concrete Client structs. -/// -/// See [`Client`] for an example. -pub trait ClientBase: Send + Sync { - /// Internal function to get a reference to the `rcl` handle. - fn handle(&self) -> &ClientHandle; - /// Tries to take a new response and run the callback or future with it. - fn execute(&self) -> Result<(), RclrsError>; + fn execute(&self) -> Result<(), RclrsError> { + let (res, req_id) = match self.take_response() { + Ok((res, req_id)) => (res, req_id), + Err(RclrsError::RclError { + code: RclReturnCode::ClientTakeFailed, + .. + }) => { + // Spurious wakeup – this may happen even when a waitset indicated that this + // client was ready, so it shouldn't be an error. + return Ok(()); + } + Err(e) => return Err(e), + }; + let requests = &mut *self.requests.lock(); + let futures = &mut *self.futures.lock(); + if let Some(callback) = requests.remove(&req_id.sequence_number) { + callback(res); + } else if let Some(future) = futures.remove(&req_id.sequence_number) { + let _ = future.send(res); + } + Ok(()) + } } -type RequestValue = Box; - -type RequestId = i64; +/// A marker trait to distinguish `Client` waitables from other [`Waitable`]s. +pub trait ClientWaitable: Waitable {} -/// Main class responsible for sending requests to a ROS service. -pub struct Client -where - T: rosidl_runtime_rs::Service, -{ - pub(crate) handle: Arc, - requests: Mutex>>, - futures: Arc>>>, -} +impl ClientWaitable for Client where T: rosidl_runtime_rs::Service {} impl Client where @@ -110,13 +127,9 @@ where .ok()?; } - let handle = Arc::new(ClientHandle { - rcl_client_mtx: Mutex::new(rcl_client), - rcl_node_mtx: node.rcl_node_mtx.clone(), - }); - Ok(Self { - handle, + rcl_client_mtx: Mutex::new(rcl_client), + rcl_node_mtx: Arc::clone(&node.rcl_node_mtx), requests: Mutex::new(HashMap::new()), futures: Arc::new(Mutex::new( HashMap::>::new(), @@ -149,7 +162,7 @@ where unsafe { // SAFETY: The request type is guaranteed to match the client type by the type system. rcl_send_request( - &*self.handle.lock() as *const _, + &*self.rcl_client_mtx.lock() as *const _, rmw_message.as_ref() as *const ::RmwMsg as *mut _, &mut sequence_number, ) @@ -184,7 +197,7 @@ where unsafe { // SAFETY: The request type is guaranteed to match the client type by the type system. rcl_send_request( - &*self.handle.lock() as *const _, + &*self.rcl_client_mtx.lock() as *const _, rmw_message.as_ref() as *const ::RmwMsg as *mut _, &mut sequence_number, ) @@ -228,11 +241,11 @@ where type RmwMsg = <::Response as rosidl_runtime_rs::Message>::RmwMsg; let mut response_out = RmwMsg::::default(); - let handle = &*self.handle.lock(); + let rcl_client = &*self.rcl_client_mtx.lock(); unsafe { // SAFETY: The three pointers are valid/initialized rcl_take_response( - handle, + rcl_client, &mut request_id_out, &mut response_out as *mut RmwMsg as *mut _, ) @@ -241,35 +254,3 @@ where Ok((T::Response::from_rmw_message(response_out), request_id_out)) } } - -impl ClientBase for Client -where - T: rosidl_runtime_rs::Service, -{ - fn handle(&self) -> &ClientHandle { - &self.handle - } - - fn execute(&self) -> Result<(), RclrsError> { - let (res, req_id) = match self.take_response() { - Ok((res, req_id)) => (res, req_id), - Err(RclrsError::RclError { - code: RclReturnCode::ClientTakeFailed, - .. - }) => { - // Spurious wakeup – this may happen even when a waitset indicated that this - // client was ready, so it shouldn't be an error. - return Ok(()); - } - Err(e) => return Err(e), - }; - let requests = &mut *self.requests.lock(); - let futures = &mut *self.futures.lock(); - if let Some(callback) = requests.remove(&req_id.sequence_number) { - callback(res); - } else if let Some(future) = futures.remove(&req_id.sequence_number) { - let _ = future.send(res); - } - Ok(()) - } -} diff --git a/rclrs/src/node/service.rs b/rclrs/src/node/service.rs index e488406c5..5eff416cb 100644 --- a/rclrs/src/node/service.rs +++ b/rclrs/src/node/service.rs @@ -2,66 +2,98 @@ use std::boxed::Box; use std::ffi::CString; use std::sync::Arc; -use crate::error::{RclReturnCode, ToResult}; -use crate::Node; -use crate::{rcl_bindings::*, RclrsError}; +use crate::rcl_bindings::*; +use crate::{Node, RclReturnCode, RclrsError, ToResult, WaitSet, Waitable}; use rosidl_runtime_rs::Message; use crate::node::publisher::MessageCow; -use parking_lot::{Mutex, MutexGuard}; +use parking_lot::Mutex; // SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread // they are running in. Therefore, this type can be safely sent to another thread. unsafe impl Send for rcl_service_t {} -/// Internal struct used by services. -pub struct ServiceHandle { - handle: Mutex, - node_handle: Arc>, -} +type ServiceCallback = + Box Response + 'static + Send>; -impl ServiceHandle { - pub(crate) fn lock(&self) -> MutexGuard { - self.handle.lock() - } +/// Main class responsible for responding to requests sent by ROS clients. +pub struct Service +where + T: rosidl_runtime_rs::Service, +{ + rcl_service_mtx: Mutex, + rcl_node_mtx: Arc>, + /// The callback function that runs when a request was received. + pub callback: Mutex>, } -impl Drop for ServiceHandle { +impl Drop for Service +where + T: rosidl_runtime_rs::Service, +{ fn drop(&mut self) { - let handle = self.handle.get_mut(); - let node_handle = &mut *self.node_handle.lock(); + let rcl_service = self.rcl_service_mtx.get_mut(); + let rcl_node = &mut *self.rcl_node_mtx.lock(); // SAFETY: No preconditions for this function unsafe { - rcl_service_fini(handle, node_handle); + rcl_service_fini(rcl_service, rcl_node); } } } -/// Trait to be implemented by concrete Service structs. -/// -/// See [`Service`] for an example -pub trait ServiceBase: Send + Sync { - /// Internal function to get a reference to the `rcl` handle. - fn handle(&self) -> &ServiceHandle; - /// Tries to take a new request and run the callback with it. - fn execute(&self) -> Result<(), RclrsError>; -} - -type ServiceCallback = - Box Response + 'static + Send>; - -/// Main class responsible for responding to requests sent by ROS clients. -pub struct Service +impl Waitable for Service where T: rosidl_runtime_rs::Service, { - pub(crate) handle: Arc, - /// The callback function that runs when a request was received. - pub callback: Mutex>, + unsafe fn add_to_wait_set(self: Arc, wait_set: &mut WaitSet) -> Result<(), RclrsError> { + // SAFETY: I'm not sure if it's required, but the service pointer will remain valid + // for as long as the wait set exists, because it's stored in self.clients. + // Passing in a null pointer for the third argument is explicitly allowed. + rcl_wait_set_add_service( + &mut wait_set.rcl_wait_set, + &*self.rcl_service_mtx.lock(), + std::ptr::null_mut(), + ) + .ok()?; + wait_set.services.push(self); + Ok(()) + } + + fn execute(&self) -> Result<(), RclrsError> { + let (req, mut req_id) = match self.take_request() { + Ok((req, req_id)) => (req, req_id), + Err(RclrsError::RclError { + code: RclReturnCode::ServiceTakeFailed, + .. + }) => { + // Spurious wakeup – this may happen even when a waitset indicated that this + // service was ready, so it shouldn't be an error. + return Ok(()); + } + Err(e) => return Err(e), + }; + let res = (*self.callback.lock())(&req_id, req); + let rmw_message = ::into_rmw_message(res.into_cow()); + let rcl_service = &*self.rcl_service_mtx.lock(); + unsafe { + // SAFETY: The response type is guaranteed to match the service type by the type system. + rcl_send_response( + rcl_service, + &mut req_id, + rmw_message.as_ref() as *const ::RmwMsg as *mut _, + ) + } + .ok() + } } +/// A marker trait to distinguish `Service` waitables from other [`Waitable`]s. +pub trait ServiceWaitable: Waitable {} + +impl ServiceWaitable for Service where T: rosidl_runtime_rs::Service {} + impl Service where T: rosidl_runtime_rs::Service, @@ -100,13 +132,9 @@ where .ok()?; } - let handle = Arc::new(ServiceHandle { - handle: Mutex::new(service_handle), - node_handle: node.rcl_node_mtx.clone(), - }); - Ok(Self { - handle, + rcl_service_mtx: Mutex::new(service_handle), + rcl_node_mtx: Arc::clone(&node.rcl_node_mtx), callback: Mutex::new(Box::new(callback)), }) } @@ -141,11 +169,11 @@ where type RmwMsg = <::Request as rosidl_runtime_rs::Message>::RmwMsg; let mut request_out = RmwMsg::::default(); - let handle = &*self.handle.lock(); + let rcl_service = &*self.rcl_service_mtx.lock(); unsafe { // SAFETY: The three pointers are valid/initialized rcl_take_request( - handle, + rcl_service, &mut request_id_out, &mut request_out as *mut RmwMsg as *mut _, ) @@ -154,39 +182,3 @@ where Ok((T::Request::from_rmw_message(request_out), request_id_out)) } } - -impl ServiceBase for Service -where - T: rosidl_runtime_rs::Service, -{ - fn handle(&self) -> &ServiceHandle { - &self.handle - } - - fn execute(&self) -> Result<(), RclrsError> { - let (req, mut req_id) = match self.take_request() { - Ok((req, req_id)) => (req, req_id), - Err(RclrsError::RclError { - code: RclReturnCode::ServiceTakeFailed, - .. - }) => { - // Spurious wakeup – this may happen even when a waitset indicated that this - // service was ready, so it shouldn't be an error. - return Ok(()); - } - Err(e) => return Err(e), - }; - let res = (*self.callback.lock())(&req_id, req); - let rmw_message = ::into_rmw_message(res.into_cow()); - let handle = &*self.handle.lock(); - unsafe { - // SAFETY: The response type is guaranteed to match the service type by the type system. - rcl_send_response( - handle, - &mut req_id, - rmw_message.as_ref() as *const ::RmwMsg as *mut _, - ) - } - .ok() - } -} diff --git a/rclrs/src/node/subscription.rs b/rclrs/src/node/subscription.rs index 5732d4bc1..0be54b5b8 100644 --- a/rclrs/src/node/subscription.rs +++ b/rclrs/src/node/subscription.rs @@ -1,7 +1,6 @@ use crate::error::{RclReturnCode, ToResult}; -use crate::qos::QoSProfile; -use crate::Node; use crate::{rcl_bindings::*, RclrsError}; +use crate::{Node, QoSProfile, WaitSet, Waitable}; use std::boxed::Box; use std::ffi::CStr; @@ -11,43 +10,12 @@ use std::sync::Arc; use rosidl_runtime_rs::{Message, RmwMessage}; -use parking_lot::{Mutex, MutexGuard}; +use parking_lot::Mutex; // SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread // they are running in. Therefore, this type can be safely sent to another thread. unsafe impl Send for rcl_subscription_t {} -/// Internal struct used by subscriptions. -pub struct SubscriptionHandle { - rcl_subscription_mtx: Mutex, - rcl_node_mtx: Arc>, -} - -impl SubscriptionHandle { - pub(crate) fn lock(&self) -> MutexGuard { - self.rcl_subscription_mtx.lock() - } -} - -impl Drop for SubscriptionHandle { - fn drop(&mut self) { - let rcl_subscription = self.rcl_subscription_mtx.get_mut(); - let rcl_node = &mut *self.rcl_node_mtx.lock(); - // SAFETY: No preconditions for this function (besides the arguments being valid). - unsafe { - rcl_subscription_fini(rcl_subscription, rcl_node); - } - } -} - -/// Trait to be implemented by concrete [`Subscription`]s. -pub trait SubscriptionBase: Send + Sync { - /// Internal function to get a reference to the `rcl` handle. - fn handle(&self) -> &SubscriptionHandle; - /// Tries to take a new message and run the callback with it. - fn execute(&self) -> Result<(), RclrsError>; -} - /// Struct for receiving messages of type `T`. /// /// There can be multiple subscriptions for the same topic, in different nodes or the same node. @@ -63,12 +31,65 @@ pub struct Subscription where T: Message, { - pub(crate) handle: Arc, + rcl_subscription_mtx: Mutex, + rcl_node_mtx: Arc>, /// The callback function that runs when a message was received. pub callback: Mutex>, message: PhantomData, } +impl Drop for Subscription { + fn drop(&mut self) { + let rcl_subscription = self.rcl_subscription_mtx.get_mut(); + let rcl_node = &mut *self.rcl_node_mtx.lock(); + // SAFETY: No preconditions for this function (besides the arguments being valid). + unsafe { + rcl_subscription_fini(rcl_subscription, rcl_node); + } + } +} + +impl Waitable for Subscription +where + T: Message, +{ + unsafe fn add_to_wait_set(self: Arc, wait_set: &mut WaitSet) -> Result<(), RclrsError> { + // SAFETY: I'm not sure if it's required, but the subscription pointer will remain valid + // for as long as the wait set exists, because it's stored in self.subscriptions. + // Passing in a null pointer for the third argument is explicitly allowed. + rcl_wait_set_add_subscription( + &mut wait_set.rcl_wait_set, + &*self.rcl_subscription_mtx.lock(), + std::ptr::null_mut(), + ) + .ok()?; + wait_set.subscriptions.push(self); + Ok(()) + } + + fn execute(&self) -> Result<(), RclrsError> { + let msg = match self.take() { + Ok(msg) => msg, + Err(RclrsError::RclError { + code: RclReturnCode::SubscriptionTakeFailed, + .. + }) => { + // Spurious wakeup – this may happen even when a waitset indicated that this + // subscription was ready, so it shouldn't be an error. + return Ok(()); + } + Err(e) => return Err(e), + }; + (*self.callback.lock())(msg); + Ok(()) + } +} + +/// A marker trait to distinguish `Subscription` waitables from other [`Waitable`]s. +pub trait SubscriptionWaitable: Waitable {} + +impl SubscriptionWaitable for Subscription where T: Message {} + impl Subscription where T: Message, @@ -113,13 +134,9 @@ where .ok()?; } - let handle = Arc::new(SubscriptionHandle { - rcl_subscription_mtx: Mutex::new(rcl_subscription), - rcl_node_mtx: node.rcl_node_mtx.clone(), - }); - Ok(Self { - handle, + rcl_subscription_mtx: Mutex::new(rcl_subscription), + rcl_node_mtx: Arc::clone(&node.rcl_node_mtx), callback: Mutex::new(Box::new(callback)), message: PhantomData, }) @@ -133,7 +150,8 @@ where // SAFETY: No preconditions for the function used // The unsafe variables get converted to safe types before being returned unsafe { - let raw_topic_pointer = rcl_subscription_get_topic_name(&*self.handle.lock()); + let raw_topic_pointer = + rcl_subscription_get_topic_name(&*self.rcl_subscription_mtx.lock()); CStr::from_ptr(raw_topic_pointer) .to_string_lossy() .into_owned() @@ -164,7 +182,7 @@ where // ``` pub fn take(&self) -> Result { let mut rmw_message = ::RmwMsg::default(); - let rcl_subscription = &mut *self.handle.lock(); + let rcl_subscription = &mut *self.rcl_subscription_mtx.lock(); unsafe { // SAFETY: The first two pointers are valid/initialized, and do not need to be valid // beyond the function call. @@ -181,32 +199,6 @@ where } } -impl SubscriptionBase for Subscription -where - T: Message, -{ - fn handle(&self) -> &SubscriptionHandle { - &self.handle - } - - fn execute(&self) -> Result<(), RclrsError> { - let msg = match self.take() { - Ok(msg) => msg, - Err(RclrsError::RclError { - code: RclReturnCode::SubscriptionTakeFailed, - .. - }) => { - // Spurious wakeup – this may happen even when a waitset indicated that this - // subscription was ready, so it shouldn't be an error. - return Ok(()); - } - Err(e) => return Err(e), - }; - (*self.callback.lock())(msg); - Ok(()) - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/rclrs/src/wait.rs b/rclrs/src/wait.rs index a20089222..07f8ff3a4 100644 --- a/rclrs/src/wait.rs +++ b/rclrs/src/wait.rs @@ -17,7 +17,7 @@ use crate::error::{to_rclrs_result, RclReturnCode, RclrsError, ToResult}; use crate::rcl_bindings::*; -use crate::{ClientBase, Context, ServiceBase, SubscriptionBase}; +use crate::{ClientWaitable, Context, ServiceWaitable, SubscriptionWaitable}; use std::sync::Arc; use std::time::Duration; @@ -25,27 +25,53 @@ use std::vec::Vec; use parking_lot::Mutex; +/// Trait to be implemented by entities that can be waited on, like a [`Subscription`][1]. +/// +/// [1]: crate::Subscription +pub trait Waitable: Send + Sync { + /// Adds itself to the given wait set. + /// + /// This will return an error if the number of waitables of this kind in the wait set is larger + /// than the capacity set in [`WaitSet::new`]. + /// + /// # Safety + /// + /// The same waitable must not be added to multiple wait sets, because that would make it + /// unsafe to simultaneously wait on those wait sets. Quoting from the rcl docs: + /// "This function is thread-safe for unique wait sets with unique contents. + /// This function cannot operate on the same wait set in multiple threads, and + /// the wait sets may not share content. + /// For example, calling `rcl_wait()` in two threads on two different wait sets + /// that both contain a single, shared guard condition is undefined behavior." + /// + /// This function is unsafe because the implementation must ensure that the item is not yet in a + /// different wait set. + unsafe fn add_to_wait_set(self: Arc, wait_set: &mut WaitSet) -> Result<(), RclrsError>; + /// Tries to take a new message and run the callback with it. + fn execute(&self) -> Result<(), RclrsError>; +} + /// A struct for waiting on subscriptions and other waitable entities to become ready. pub struct WaitSet { - rcl_wait_set: rcl_wait_set_t, + pub(crate) rcl_wait_set: rcl_wait_set_t, // Used to ensure the context is alive while the wait set is alive. _rcl_context_mtx: Arc>, // The subscriptions that are currently registered in the wait set. // This correspondence is an invariant that must be maintained by all functions, // even in the error case. - subscriptions: Vec>, - clients: Vec>, - services: Vec>, + pub(crate) subscriptions: Vec>, + pub(crate) clients: Vec>, + pub(crate) services: Vec>, } /// A list of entities that are ready, returned by [`WaitSet::wait`]. pub struct ReadyEntities { /// A list of subscriptions that have potentially received messages. - pub subscriptions: Vec>, + pub subscriptions: Vec>, /// A list of clients that have potentially received responses. - pub clients: Vec>, + pub clients: Vec>, /// A list of services that have potentially received requests. - pub services: Vec>, + pub services: Vec>, } impl Drop for rcl_wait_set_t { @@ -61,8 +87,8 @@ impl Drop for rcl_wait_set_t { impl WaitSet { /// Creates a new wait set. /// - /// The given number of subscriptions is a capacity, corresponding to how often - /// [`WaitSet::add_subscription`] may be called. + /// The given numbers are capacities, corresponding to how often + /// [`Waitable::add_to_wait_set`] may be called. pub fn new( number_of_subscriptions: usize, number_of_guard_conditions: usize, @@ -100,50 +126,6 @@ impl WaitSet { }) } - /// Removes all entities from the wait set. - /// - /// This effectively resets the wait set to the state it was in after being created by - /// [`WaitSet::new`]. - pub fn clear(&mut self) { - self.subscriptions.clear(); - self.clients.clear(); - self.services.clear(); - // This cannot fail – the rcl_wait_set_clear function only checks that the input handle is - // valid, which it always is in our case. Hence, only debug_assert instead of returning - // Result. - // SAFETY: No preconditions for this function (besides passing in a valid wait set). - let ret = unsafe { rcl_wait_set_clear(&mut self.rcl_wait_set) }; - debug_assert_eq!(ret, 0); - } - - /// Adds a subscription to the wait set. - /// - /// It is possible, but not useful, to add the same subscription twice. - /// - /// This will return an error if the number of subscriptions in the wait set is larger than the - /// capacity set in [`WaitSet::new`]. - /// - /// The same subscription must not be added to multiple wait sets, because that would make it - /// unsafe to simultaneously wait on those wait sets. - pub fn add_subscription( - &mut self, - subscription: Arc, - ) -> Result<(), RclrsError> { - unsafe { - // SAFETY: I'm not sure if it's required, but the subscription pointer will remain valid - // for as long as the wait set exists, because it's stored in self.subscriptions. - // Passing in a null pointer for the third argument is explicitly allowed. - rcl_wait_set_add_subscription( - &mut self.rcl_wait_set, - &*subscription.handle().lock(), - std::ptr::null_mut(), - ) - } - .ok()?; - self.subscriptions.push(subscription); - Ok(()) - } - /// Adds a client to the wait set. /// /// It is possible, but not useful, to add the same client twice. @@ -153,20 +135,10 @@ impl WaitSet { /// /// The same client must not be added to multiple wait sets, because that would make it /// unsafe to simultaneously wait on those wait sets. - pub fn add_client(&mut self, client: Arc) -> Result<(), RclrsError> { - unsafe { - // SAFETY: I'm not sure if it's required, but the client pointer will remain valid - // for as long as the wait set exists, because it's stored in self.clients. - // Passing in a null pointer for the third argument is explicitly allowed. - rcl_wait_set_add_client( - &mut self.rcl_wait_set, - &*client.handle().lock() as *const _, - core::ptr::null_mut(), - ) - } - .ok()?; - self.clients.push(client); - Ok(()) + pub fn add_client(&mut self, client: Arc) -> Result<(), RclrsError> { + // SAFETY: The implementation of this trait for clients checks that the client + // has not already been added to a different wait set. + unsafe { client.add_to_wait_set(self) } } /// Adds a service to the wait set. @@ -178,20 +150,44 @@ impl WaitSet { /// /// The same service must not be added to multiple wait sets, because that would make it /// unsafe to simultaneously wait on those wait sets. - pub fn add_service(&mut self, service: Arc) -> Result<(), RclrsError> { - unsafe { - // SAFETY: I'm not sure if it's required, but the service pointer will remain valid - // for as long as the wait set exists, because it's stored in self.services. - // Passing in a null pointer for the third argument is explicitly allowed. - rcl_wait_set_add_service( - &mut self.rcl_wait_set, - &*service.handle().lock() as *const _, - core::ptr::null_mut(), - ) - } - .ok()?; - self.services.push(service); - Ok(()) + pub fn add_service(&mut self, service: Arc) -> Result<(), RclrsError> { + // SAFETY: The implementation of this trait for services checks that the service + // has not already been added to a different wait set. + unsafe { service.add_to_wait_set(self) } + } + + /// Adds a subscription to the wait set. + /// + /// It is possible, but not useful, to add the same subscription twice. + /// + /// This will return an error if the number of subscriptions in the wait set is larger than the + /// capacity set in [`WaitSet::new`]. + /// + /// The same subscription must not be added to multiple wait sets, because that would make it + /// unsafe to simultaneously wait on those wait sets. + pub fn add_subscription( + &mut self, + subscription: Arc, + ) -> Result<(), RclrsError> { + // SAFETY: The implementation of this trait for subscriptions checks that the subscription + // has not already been added to a different wait set. + unsafe { subscription.add_to_wait_set(self) } + } + + /// Removes all entities from the wait set. + /// + /// This effectively resets the wait set to the state it was in after being created by + /// [`WaitSet::new`]. + pub fn clear(&mut self) { + self.subscriptions.clear(); + self.clients.clear(); + self.services.clear(); + // This cannot fail – the rcl_wait_set_clear function only checks that the input handle is + // valid, which it always is in our case. Hence, only debug_assert instead of returning + // Result. + // SAFETY: No preconditions for this function (besides passing in a valid wait set). + let ret = unsafe { rcl_wait_set_clear(&mut self.rcl_wait_set) }; + debug_assert_eq!(ret, 0); } /// Blocks until the wait set is ready, or until the timeout has been exceeded.