Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions capnp-futures/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@ keywords = ["async"]

[dependencies]
capnp = { version = "0.23.0-alpha", path = "../capnp" }
embedded-io-async = { version = "0.7.0", optional = true }

[dependencies.futures-channel]
version = "0.3.0"
default-features = false
features = ["std"]

[dependencies.futures-util]
version = "0.3.0"
default-features = false
features = ["io", "std"]
features = ["io"]

[dev-dependencies.futures]
version = "0.3.0"
Expand All @@ -33,5 +33,12 @@ features = ["executor"]
capnp = { version = "0.23.0", path = "../capnp", features = ["quickcheck"] }
quickcheck = "1"


[features]
default = ["std"]
std = ["futures-channel/std", "futures-util/std", "embedded-io-async?/std", "alloc"]
alloc = ["futures/alloc", "embedded-io-async?/alloc"]
embedded-io = ["dep:embedded-io-async"]

[lints]
workspace = true
5 changes: 5 additions & 0 deletions capnp-futures/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

#![cfg_attr(not(feature = "std"), no_std)]

#[cfg(feature = "alloc")]
extern crate alloc;

pub use read_stream::ReadStream;
pub use write_queue::{write_queue, Sender};

Expand Down
6 changes: 3 additions & 3 deletions capnp-futures/src/read_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};

use capnp::{message, Error};
use futures_util::stream::Stream;
Expand Down
6 changes: 3 additions & 3 deletions capnp-futures/src/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ where
}
}

impl<A> AsOutputSegments for ::std::rc::Rc<message::Builder<A>>
impl<A> AsOutputSegments for ::alloc::rc::Rc<message::Builder<A>>
where
A: message::Allocator,
{
Expand All @@ -187,7 +187,7 @@ where
}
}

impl<A> AsOutputSegments for ::std::sync::Arc<message::Builder<A>>
impl<A> AsOutputSegments for ::alloc::sync::Arc<message::Builder<A>>
where
A: message::Allocator,
{
Expand Down Expand Up @@ -238,7 +238,7 @@ where
buf[(idx - 1) * 4..idx * 4]
.copy_from_slice(&((segments[idx].len() / 8) as u32).to_le_bytes());
}
if segment_count % 2 == 0 {
if segment_count.is_multiple_of(2) {
for idx in (buf.len() - 4)..(buf.len()) {
buf[idx] = 0
}
Expand Down
28 changes: 14 additions & 14 deletions capnp-futures/src/serialize_packed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
//! Asynchronous reading and writing of messages using the
//! [packed stream encoding](https://capnproto.org/encoding.html#packing).

use std::pin::Pin;
use std::task::{Context, Poll};
use core::pin::Pin;
use core::task::{Context, Poll};

use capnp::serialize::OwnedSegments;
use capnp::{message, Result};
Expand Down Expand Up @@ -82,9 +82,9 @@ where
{
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
cx: &mut core::task::Context<'_>,
outbuf: &mut [u8],
) -> Poll<std::result::Result<usize, std::io::Error>> {
) -> Poll<core::result::Result<usize, std::io::Error>> {
let Self {
stage,
inner,
Expand Down Expand Up @@ -129,7 +129,7 @@ where
}
}
PackedReadStage::WritingZeroes => {
let num_zeroes = std::cmp::min(outbuf.len(), *num_run_bytes_remaining);
let num_zeroes = core::cmp::min(outbuf.len(), *num_run_bytes_remaining);

for value in outbuf.iter_mut().take(num_zeroes) {
*value = 0;
Expand Down Expand Up @@ -186,7 +186,7 @@ where
return Poll::Ready(Ok(ii));
}
PackedReadStage::WritingPassthrough => {
let upper_bound = std::cmp::min(*num_run_bytes_remaining, outbuf.len());
let upper_bound = core::cmp::min(*num_run_bytes_remaining, outbuf.len());
if upper_bound == 0 {
*stage = PackedReadStage::Start;
} else {
Expand Down Expand Up @@ -281,11 +281,11 @@ where
}
}

impl<W> std::future::Future for FinishPendingWrites<W>
impl<W> core::future::Future for FinishPendingWrites<W>
where
W: AsyncWrite + Unpin,
{
type Output = std::result::Result<(), capnp::Error>;
type Output = core::result::Result<(), capnp::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match self.inner.finish_pending_writes(cx)? {
Poll::Ready(()) => Poll::Ready(Ok(())),
Expand Down Expand Up @@ -332,7 +332,7 @@ where
&mut self,
cx: &mut Context<'_>,
mut inbuf: &[u8],
) -> Poll<std::result::Result<usize, std::io::Error>> {
) -> Poll<core::result::Result<usize, std::io::Error>> {
let mut inbuf_bytes_consumed: usize = 0;
let Self {
stage,
Expand All @@ -352,7 +352,7 @@ where

// copy inbuf into buf
let buf_bytes_remaining = 8 - *buf_pos;
let bytes_to_copy = std::cmp::min(buf_bytes_remaining, inbuf.len());
let bytes_to_copy = core::cmp::min(buf_bytes_remaining, inbuf.len());
buf[*buf_pos..(*buf_pos + bytes_to_copy)]
.copy_from_slice(&inbuf[..bytes_to_copy]);
inbuf = &inbuf[bytes_to_copy..];
Expand Down Expand Up @@ -488,7 +488,7 @@ where
fn finish_pending_writes(
&mut self,
cx: &mut Context<'_>,
) -> Poll<std::result::Result<(), std::io::Error>> {
) -> Poll<core::result::Result<(), std::io::Error>> {
while self.stage == PackedWriteStage::WriteWord
|| self.stage == PackedWriteStage::WriteRunWordCount
{
Expand All @@ -509,14 +509,14 @@ where
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
inbuf: &[u8],
) -> Poll<std::result::Result<usize, std::io::Error>> {
) -> Poll<core::result::Result<usize, std::io::Error>> {
(*self).poll_write_aux(cx, inbuf)
}

fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<std::result::Result<(), std::io::Error>> {
) -> Poll<core::result::Result<(), std::io::Error>> {
match (*self).finish_pending_writes(cx)? {
Poll::Pending => return Poll::Pending,
Poll::Ready(_) => (),
Expand All @@ -528,7 +528,7 @@ where
fn poll_close(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<std::result::Result<(), std::io::Error>> {
) -> Poll<core::result::Result<(), std::io::Error>> {
Pin::new(&mut self.inner).poll_close(cx)
}
}
Expand Down
12 changes: 6 additions & 6 deletions capnp-futures/src/write_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

use std::future::Future;
use core::future::Future;

use futures_channel::oneshot;
use futures_util::{AsyncWrite, AsyncWriteExt, StreamExt, TryFutureExt};
Expand All @@ -41,7 +41,7 @@ where
M: AsOutputSegments,
{
sender: futures_channel::mpsc::UnboundedSender<Item<M>>,
in_flight: std::sync::Arc<std::sync::atomic::AtomicI32>,
in_flight: alloc::sync::Arc<core::sync::atomic::AtomicI32>,
}

impl<M> Clone for Sender<M>
Expand Down Expand Up @@ -69,7 +69,7 @@ where
{
let (tx, mut rx) = futures_channel::mpsc::unbounded::<Item<M>>();

let in_flight = std::sync::Arc::new(std::sync::atomic::AtomicI32::new(0));
let in_flight = alloc::sync::Arc::new(core::sync::atomic::AtomicI32::new(0));

let sender = Sender {
sender: tx,
Expand All @@ -81,7 +81,7 @@ where
match item {
Item::Message(m, returner) => {
let result = crate::serialize::write_message(&mut writer, &m).await;
in_flight.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
in_flight.fetch_sub(1, core::sync::atomic::Ordering::SeqCst);
result?;
writer.flush().await?;
let _ = returner.send(m);
Expand Down Expand Up @@ -123,7 +123,7 @@ where
/// has completed. Dropping the returned future does *not* cancel the write.
pub fn send(&mut self, message: M) -> impl Future<Output = Result<M, Error>> + Unpin {
self.in_flight
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
.fetch_add(1, core::sync::atomic::Ordering::SeqCst);
let (complete, oneshot) = oneshot::channel();

let _ = self.sender.unbounded_send(Item::Message(message, complete));
Expand All @@ -133,7 +133,7 @@ where

/// Returns the number of messages queued to be written.
pub fn len(&self) -> usize {
let result = self.in_flight.load(std::sync::atomic::Ordering::SeqCst);
let result = self.in_flight.load(core::sync::atomic::Ordering::SeqCst);
assert!(result >= 0);
result as usize
}
Expand Down
10 changes: 8 additions & 2 deletions capnp-rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,17 @@ readme = "README.md"
[dependencies.futures]
version = "0.3.0"
default-features = false
features = ["std"]

[dependencies]
capnp-futures = { version = "0.23.0", path = "../capnp-futures" }
capnp-futures = { version = "0.23.0", path = "../capnp-futures", default-features = false }
capnp = {version = "0.23.0", path = "../capnp"}
embedded-io-async = { version = "0.7.0", optional = true }

[features]
default = ["std"]
std = ["futures/std", "embedded-io-async?/std", "capnp-futures/std", "alloc"]
alloc = ["futures/alloc", "embedded-io-async?/alloc", "capnp-futures/alloc"]
embedded-io = ["dep:embedded-io-async"]

#[lints]
#workspace = true
Expand Down
4 changes: 2 additions & 2 deletions capnp-rpc/src/attach.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

use core::pin::Pin;
use core::task::{Context, Poll};
use futures::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

pub struct AttachFuture<F, T>
where
Expand Down
4 changes: 3 additions & 1 deletion capnp-rpc/src/broken.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

use alloc::boxed::Box;
use alloc::vec::Vec;
use capnp::any_pointer;
use capnp::private::capability::{
ClientHook, ParamsHook, PipelineHook, PipelineOp, RequestHook, ResultsHook,
Expand All @@ -28,7 +30,7 @@ use capnp::Error;
use capnp::capability::{Promise, RemotePromise};
use capnp::traits::ImbueMut;

use std::rc::Rc;
use alloc::rc::Rc;

pub struct Pipeline {
error: Error,
Expand Down
8 changes: 4 additions & 4 deletions capnp-rpc/src/flow_control.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use capnp::capability::Promise;
use capnp::Error;

use alloc::{boxed::Box, rc::Rc, vec, vec::Vec};
use core::cell::RefCell;
use futures::channel::oneshot;
use futures::TryFutureExt;
use std::cell::RefCell;
use std::rc::Rc;

use crate::task_set::{TaskReaper, TaskSet, TaskSetHandle};

Expand Down Expand Up @@ -47,7 +47,7 @@ impl TaskReaper<Error> for Reaper {
fn task_failed(&mut self, error: Error) {
let mut inner = self.inner.borrow_mut();
if let State::Running(ref mut blocked_sends) = &mut inner.state {
for s in std::mem::take(blocked_sends) {
for s in core::mem::take(blocked_sends) {
let _ = s.send(Err(error.clone()));
}
inner.state = State::Failed(error)
Expand Down Expand Up @@ -99,7 +99,7 @@ impl crate::FlowController for FixedWindowFlowController {
match inner.state {
State::Running(ref mut blocked_sends) => {
if is_ready {
for s in std::mem::take(blocked_sends) {
for s in core::mem::take(blocked_sends) {
let _ = s.send(Ok(()));
}
}
Expand Down
37 changes: 29 additions & 8 deletions capnp-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,24 @@
//!
//! For a more complete example, see <https://github.com/capnproto/capnproto-rust/tree/master/capnp-rpc/examples/calculator>

#![cfg_attr(not(feature = "std"), no_std)]

#[cfg(feature = "alloc")]
extern crate alloc;

use alloc::boxed::Box;
use alloc::format;
use alloc::rc::{Rc, Weak};
use alloc::string::ToString;
use alloc::vec::Vec;
use capnp::capability::Promise;
use capnp::private::capability::ClientHook;
use capnp::Error;
use core::cell::RefCell;
use core::pin::Pin;
use core::task::{Context, Poll};
use futures::channel::oneshot;
use futures::{Future, FutureExt, TryFutureExt};
use std::cell::RefCell;
use std::pin::Pin;
use std::rc::{Rc, Weak};
use std::task::{Context, Poll};

pub use crate::rpc::Disconnector;
use crate::task_set::TaskSet;
Expand Down Expand Up @@ -402,8 +411,11 @@ pub struct CapabilityServerSet<S, C>
where
C: capnp::capability::FromServer<S>,
{
#[cfg(feature = "std")]
caps: std::collections::HashMap<usize, Weak<S>>,
marker: std::marker::PhantomData<C>,
#[cfg(not(feature = "std"))]
caps: alloc::collections::BTreeMap<usize, Weak<S>>,
marker: core::marker::PhantomData<C>,
}

impl<S, C> Default for CapabilityServerSet<S, C>
Expand All @@ -412,8 +424,8 @@ where
{
fn default() -> Self {
Self {
caps: std::default::Default::default(),
marker: std::marker::PhantomData,
caps: core::default::Default::default(),
marker: core::marker::PhantomData,
}
}
}
Expand Down Expand Up @@ -501,7 +513,16 @@ where
struct SystemTaskReaper;
impl crate::task_set::TaskReaper<Error> for SystemTaskReaper {
fn task_failed(&mut self, error: Error) {
println!("ERROR: {error}");
#[cfg(feature = "std")]
{
println!("ERROR: {error}");
}

#[cfg(not(feature = "std"))]
{
// In no_std environments we can't print the error, so just do nothing.
drop(error);
}
}
}

Expand Down
Loading
Loading