Skip to content
This repository was archived by the owner on Oct 30, 2019. It is now read-only.

Remove JoinHandle #83

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 15 additions & 29 deletions benches/baseline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ extern crate test;

mod baseline {
use futures::executor;
use futures::future::RemoteHandle;
use futures::prelude::*;
use std::pin::Pin;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -42,7 +43,7 @@ mod baseline {

let tasks = (0..300)
.map(|_| {
spawn(async move {
spawn_remote(async move {
Task { depth: 0 }.await;
})
})
Expand All @@ -59,7 +60,7 @@ mod baseline {
fn spawn_many(b: &mut test::Bencher) {
b.iter(|| {
executor::block_on(async {
let tasks = (0..25_000).map(|_| spawn(async {})).collect::<Vec<_>>();
let tasks = (0..25_000).map(|_| spawn_remote(async {})).collect::<Vec<_>>();

for task in tasks {
task.await;
Expand All @@ -78,7 +79,7 @@ mod baseline {
executor::block_on(async {
let tasks = (0..300)
.map(|_| {
spawn(async {
spawn_remote(async {
let (r, s) = mio::Registration::new2();
let registration = Registration::new();
registration.register(&r).unwrap();
Expand Down Expand Up @@ -112,39 +113,24 @@ mod baseline {
});
}

/// Spawn function for juliex
pub fn spawn<F>(fut: F)
where
F: Future<Output = ()> + Send + 'static,
{
juliex::spawn(fut);
}

/// Spawn function for juliex to get back a handle
pub fn spawn<F, T>(fut: F) -> JoinHandle<T>
pub fn spawn_remote<F, T>(fut: F) -> RemoteHandle<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
let (tx, rx) = futures::channel::oneshot::channel();

let fut = async move {
let t = fut.await;
let _ = tx.send(t);
};
let (fut, handle) = fut.remote_handle();

juliex::spawn(fut);
JoinHandle { rx }
}

/// Handle returned from Juliex.
// We should patch Juliex to support this natively, and be more efficient on channel use.
#[derive(Debug)]
pub struct JoinHandle<T> {
pub(crate) rx: futures::channel::oneshot::Receiver<T>,
}

impl<T> Future for JoinHandle<T> {
type Output = T;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.rx.poll_unpin(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(t)) => Poll::Ready(t),
Poll::Ready(Err(_)) => panic!(), // TODO: Is this OK? Print a better error message?
}
}
handle
}
}
8 changes: 4 additions & 4 deletions benches/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ macro_rules! benchmark_suite {

let tasks = (0..300)
.map(|_| {
runtime::spawn(async {
runtime::task::spawn_remote(async {
Task { depth: 0 }.await;
})
})
Expand All @@ -44,7 +44,7 @@ macro_rules! benchmark_suite {
#[runtime::bench($rt)]
async fn spawn_many() {
let tasks = (0..25_000)
.map(|_| runtime::spawn(async {}))
.map(|_| runtime::task::spawn_remote(async {}))
.collect::<Vec<_>>();

for task in tasks {
Expand All @@ -61,15 +61,15 @@ macro_rules! benchmark_suite {

let tasks = (0..300)
.map(|_| {
runtime::spawn(async {
runtime::task::spawn_remote(async {
let (r, s) = mio::Registration::new2();
let registration = Registration::new();
registration.register(&r).unwrap();

let mut depth = 0;
let mut capture = Some(r);

runtime::spawn(
runtime::task::spawn(
Compat01As03::new(future::poll_fn(move || loop {
if registration.poll_read_ready().unwrap().is_ready() {
depth += 1;
Expand Down
2 changes: 1 addition & 1 deletion examples/guessing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ async fn main() -> Result<(), failure::Error> {
incoming
.try_for_each_concurrent(None, |stream| {
async move {
runtime::spawn(play(stream)).await?;
runtime::task::spawn_remote(play(stream)).await?;
Ok::<(), failure::Error>(())
}
})
Expand Down
2 changes: 1 addition & 1 deletion examples/tcp-echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ async fn main() -> std::io::Result<()> {
.incoming()
.try_for_each_concurrent(None, |stream| {
async move {
runtime::spawn(async move {
runtime::task::spawn_remote(async move {
println!("Accepting from: {}", stream.peer_addr()?);

let (reader, writer) = &mut stream.split();
Expand Down
2 changes: 1 addition & 1 deletion examples/tcp-proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async fn main() -> std::io::Result<()> {
.incoming()
.try_for_each_concurrent(None, |client| {
async move {
runtime::spawn(async move {
runtime::task::spawn_remote(async move {
let server = TcpStream::connect("127.0.0.1:8080").await?;
println!(
"Proxying {} to {}",
Expand Down
2 changes: 1 addition & 1 deletion runtime-attributes/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ pub fn test(attr: TokenStream, item: TokenStream) -> TokenStream {
///
/// #[runtime::test]
/// async fn spawn_and_await() {
/// runtime::spawn(async {}).await;
/// runtime::task::spawn_remote(async {}).await;
/// }
/// ```
#[proc_macro_attribute]
Expand Down
3 changes: 0 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,6 @@ pub mod prelude {
pub use super::time::StreamExt as _;
}

#[doc(inline)]
pub use task::spawn;

#[doc(inline)]
pub use runtime_attributes::{bench, test};

Expand Down
4 changes: 2 additions & 2 deletions src/net/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,14 +304,14 @@ impl fmt::Debug for ConnectFuture {
/// // accept connections and process them in parallel
/// let mut incoming = listener.incoming();
/// while let Some(stream) = incoming.next().await {
/// runtime::spawn(async move {
/// runtime::task::spawn_remote(async move {
/// let stream = stream?;
/// println!("Accepting from: {}", stream.peer_addr()?);
///
/// let (reader, writer) = &mut stream.split();
/// reader.copy_into(writer).await?;
/// Ok::<(), std::io::Error>(())
/// });
/// }).forget();
/// }
/// Ok(())
/// }
Expand Down
66 changes: 32 additions & 34 deletions src/task.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
//! Types and Functions for working with asynchronous tasks.

use std::pin::Pin;

use futures::future::FutureObj;
use futures::future::{FutureObj, RemoteHandle};
use futures::prelude::*;
use futures::task::{Context, Poll, Spawn, SpawnError};
use futures::task::{Spawn, SpawnError};

/// A [`Spawn`] handle to runtime's thread pool for spawning futures.
///
Expand Down Expand Up @@ -56,49 +54,49 @@ impl<'a> Spawn for &'a Spawner {
///
/// #[runtime::main]
/// async fn main() {
/// let handle = runtime::spawn(async {
/// runtime::task::spawn(async {
/// // might not run at all as we're not waiting for it to be done
/// println!("running the future");
/// });
/// }
/// ```
pub fn spawn<F>(fut: F)
where
F: Future<Output = ()> + Send + 'static,
{
runtime_raw::current_runtime()
.spawn_boxed(fut.boxed())
.expect("cannot spawn a future");
}

/// Spawns a future on the runtime's thread pool and makes the result available.
///
/// This function can only be called after a runtime has been initialized.
///
/// If the returned handle is dropped the future is aborted by default.
///
/// ```
/// #![feature(async_await)]
///
/// #[runtime::main]
/// async fn main() {
/// let handle = runtime::task::spawn_remote(async {
/// println!("running the future");
/// 42
/// });
/// assert_eq!(handle.await, 42);
/// }
/// ```
pub fn spawn<F, T>(fut: F) -> JoinHandle<T>
pub fn spawn_remote<F, T>(fut: F) -> RemoteHandle<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
let (tx, rx) = futures::channel::oneshot::channel();

let fut = async move {
let t = fut.await;
let _ = tx.send(t);
};
let (fut, handle) = fut.remote_handle();

runtime_raw::current_runtime()
.spawn_boxed(fut.boxed())
.expect("cannot spawn a future");

JoinHandle { rx }
}

/// A handle that awaits the result of a [`spawn`]ed future.
///
/// [`spawn`]: fn.spawn.html
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[derive(Debug)]
pub struct JoinHandle<T> {
pub(crate) rx: futures::channel::oneshot::Receiver<T>,
}

impl<T> Future for JoinHandle<T> {
type Output = T;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.rx.poll_unpin(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(t)) => Poll::Ready(t),
Poll::Ready(Err(_)) => panic!(), // TODO: Is this OK? Print a better error message?
}
}
handle
}
2 changes: 1 addition & 1 deletion tests/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use runtime_native::Native;

#[runtime::test(Native)]
async fn spawn() {
let handle = runtime::spawn(async {
let handle = runtime::task::spawn_remote(async {
println!("hello planet from Native");
42
});
Expand Down
2 changes: 1 addition & 1 deletion tests/tokio-current-thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

#[runtime::test(runtime_tokio::TokioCurrentThread)]
async fn spawn() {
let handle = runtime::spawn(async {
let handle = runtime::task::spawn_remote(async {
println!("hello planet from Tokio current-thread");
42
});
Expand Down
2 changes: 1 addition & 1 deletion tests/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use runtime_tokio::Tokio;

#[runtime::test(Tokio)]
async fn spawn() {
let handle = runtime::spawn(async {
let handle = runtime::task::spawn_remote(async {
println!("hello planet from Tokio");
42
});
Expand Down