Skip to content
This repository was archived by the owner on Oct 30, 2019. It is now read-only.

Proper runtime "enter" instead of spawn + local blocking #73

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion runtime-native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ runtime-raw = { path = "../runtime-raw", version = "0.3.0-alpha.4" }
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
async-datagram = "3.0.0"
juliex = "0.3.0-alpha.6"
lazy_static = "1.3.0"
romio = "0.3.0-alpha.9"
futures-timer = "0.3.0"

Expand Down
31 changes: 20 additions & 11 deletions runtime-native/src/not_wasm32.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use futures::prelude::*;
use futures::{future::BoxFuture, task::SpawnError};
use futures::{executor, future::BoxFuture, task::SpawnError};
use futures_timer::{Delay as AsyncDelay, Interval as AsyncInterval};
use lazy_static::lazy_static;

use std::io;
use std::net::SocketAddr;
Expand All @@ -16,21 +15,13 @@ use tcp::{TcpListener, TcpStream};
use time::{Delay, Interval};
use udp::UdpSocket;

lazy_static! {
static ref JULIEX_THREADPOOL: juliex::ThreadPool = {
juliex::ThreadPool::with_setup(|| {
runtime_raw::set_runtime(&Native);
})
};
}

/// The Native runtime.
#[derive(Debug)]
pub struct Native;

impl runtime_raw::Runtime for Native {
fn spawn_boxed(&self, fut: BoxFuture<'static, ()>) -> Result<(), SpawnError> {
JULIEX_THREADPOOL.spawn_boxed(fut.into());
juliex::spawn(fut);
Ok(())
}

Expand Down Expand Up @@ -78,3 +69,21 @@ impl runtime_raw::Runtime for Native {
Box::pin(Interval { async_interval })
}
}

impl<F, T> runtime_raw::BlockingRuntime<F, T> for Native
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
fn block_on(&self, fut: F) -> T {
let pool = juliex::ThreadPool::with_setup(|| {
runtime_raw::set_runtime(&Native);
});

let (fut, handle) = fut.remote_handle();

pool.spawn(fut);

executor::block_on(handle)
}
}
67 changes: 46 additions & 21 deletions runtime-raw/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
rust_2018_idioms
)]

use futures::executor;
use futures::future::BoxFuture;
use futures::prelude::*;
use futures::task::SpawnError;
Expand Down Expand Up @@ -43,36 +42,41 @@ pub fn current_runtime() -> &'static dyn Runtime {
RUNTIME.with(|r| r.get().expect("the runtime has not been set"))
}

/// Set the current runtime.
/// Run passed function while passed runtime is set as current runtime.
pub fn enter_runtime<F, R>(runtime: &'static dyn Runtime, f: F) -> R
where
F: FnOnce() -> R
{
RUNTIME.with(move |r| {
assert!(r.get().is_none(), "the runtime has already been set");

struct Cleanup<'a>(&'a Cell<Option<&'static dyn Runtime>>);
impl Drop for Cleanup<'_> {
fn drop(&mut self) {
self.0.set(None);
}
}
let _cleanup = Cleanup(r);

r.set(Some(runtime));
f()
})
}

/// Set the current runtime (per thread).
///
/// This function must be called at the beginning of runtime's threads before they start polling
/// any futures.
///
/// Setting the runtime can't be undone or changed again; use `enter_runtime` instead to only set
/// it for a scope.
pub fn set_runtime(runtime: &'static dyn Runtime) {
RUNTIME.with(|r| {
assert!(r.get().is_none(), "the runtime has already been set");
r.set(Some(runtime))
});
}

/// Runs a future inside a runtime and blocks on the result.
pub fn enter<R, F, T>(rt: R, fut: F) -> T
where
R: Runtime,
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
let (tx, rx) = futures::channel::oneshot::channel();

let fut = async move {
let t = fut.await;
let _ = tx.send(t);
};

rt.spawn_boxed(fut.boxed()).expect("cannot spawn a future");

executor::block_on(rx).expect("the main future has panicked")
}

/// The runtime trait.
pub trait Runtime: Send + Sync + 'static {
/// Spawn a new future.
Expand Down Expand Up @@ -117,3 +121,24 @@ pub trait Runtime: Send + Sync + 'static {
/// `Interval` would prevent it from being a trait object.
fn new_interval(&self, dur: Duration) -> Pin<Box<dyn Interval>>;
}

/// Runtime trait for runtimes supporting blocking.
pub trait BlockingRuntime<F, T>
where
F: Future<Output = T>,
{
/// Runs a future inside the runtime and blocks on the result.
///
/// Needs to call `enter_runtime` or `set_runtime` (only on background threads) in threads
/// running futures.
fn block_on(&self, fut: F) -> T;
}

/// Runs a future inside a runtime and blocks on the result.
pub fn enter<R, F, T>(rt: R, fut: F) -> T
where
R: BlockingRuntime<F, T>,
F: Future<Output = T>,
{
rt.block_on(fut)
}
1 change: 0 additions & 1 deletion runtime-tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ edition = "2018"
[dependencies]
futures-preview = { version = "0.3.0-alpha.16", features = ["compat", "io-compat"] }
futures01 = { package = "futures", version = "0.1" }
lazy_static = "1.3.0"
mio = "0.6.16"
runtime-raw = { path = "../runtime-raw", version = "0.3.0-alpha.4" }
tokio = "0.1.19"
135 changes: 45 additions & 90 deletions runtime-tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,14 @@

use futures::{
compat::Future01CompatExt,
future::{BoxFuture, FutureExt, TryFutureExt},
future::{BoxFuture, Future, FutureExt, TryFutureExt},
task::SpawnError,
};
use lazy_static::lazy_static;
use tokio::timer::{Delay as TokioDelay, Interval as TokioInterval};

use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::{mpsc, Mutex};
use std::thread;
use std::time::{Duration, Instant};

mod tcp;
Expand All @@ -33,25 +30,21 @@ use tcp::{TcpListener, TcpStream};
use time::{Delay, Interval};
use udp::UdpSocket;

/// The default Tokio runtime.
#[derive(Debug)]
pub struct Tokio;
// No matter how we "enter" the `BlockingRuntime` the `Runtime` interface to
// tokio is the same
struct TokioRuntime;

impl runtime_raw::Runtime for Tokio {
impl runtime_raw::Runtime for TokioRuntime {
fn spawn_boxed(&self, fut: BoxFuture<'static, ()>) -> Result<(), SpawnError> {
lazy_static! {
static ref TOKIO_RUNTIME: tokio::runtime::Runtime = {
tokio::runtime::Builder::new()
.after_start(|| {
runtime_raw::set_runtime(&Tokio);
})
.build()
.unwrap()
};
}

TOKIO_RUNTIME.executor().spawn(fut.unit_error().compat());
Ok(())
use tokio::executor::Executor;
let mut e = tokio::executor::DefaultExecutor::current();
e.spawn(Box::new(fut.unit_error().compat())).map_err(|e| {
if e.is_shutdown() {
SpawnError::shutdown()
} else {
panic!("can't handle tokio spawn error: {}", e);
}
})
}

fn connect_tcp_stream(
Expand Down Expand Up @@ -99,82 +92,44 @@ impl runtime_raw::Runtime for Tokio {
}
}

/// The single-threaded Tokio runtime based on `tokio-current-thread`.
/// The default Tokio runtime.
///
/// Uses a dedicated tokio instace to drive the runtime and cleans up afterwards.
#[derive(Debug)]
pub struct TokioCurrentThread;
pub struct Tokio;

impl runtime_raw::Runtime for TokioCurrentThread {
fn spawn_boxed(&self, fut: BoxFuture<'static, ()>) -> Result<(), SpawnError> {
lazy_static! {
static ref TOKIO_RUNTIME: Mutex<tokio::runtime::current_thread::Handle> = {
let (tx, rx) = mpsc::channel();

thread::spawn(move || {
let mut rt = tokio::runtime::current_thread::Runtime::new().unwrap();
let handle = rt.handle();
tx.send(handle).unwrap();

runtime_raw::set_runtime(&TokioCurrentThread);
let forever = futures01::future::poll_fn(|| {
Ok::<futures01::Async<()>, ()>(futures01::Async::NotReady)
});
rt.block_on(forever).unwrap();
});

let handle = rx.recv().unwrap();
Mutex::new(handle)
};
}

TOKIO_RUNTIME
.lock()
.unwrap()
.spawn(fut.unit_error().compat())
impl<F, T> runtime_raw::BlockingRuntime<F, T> for Tokio
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
fn block_on(&self, fut: F) -> T {
let mut rt = tokio::runtime::Builder::new()
.after_start(move || {
runtime_raw::set_runtime(&TokioRuntime);
})
.build()
.unwrap();
Ok(())
}

fn connect_tcp_stream(
&self,
addr: &SocketAddr,
) -> BoxFuture<'static, io::Result<Pin<Box<dyn runtime_raw::TcpStream>>>> {
use futures01::Future;

let tokio_connect = tokio::net::TcpStream::connect(addr);
let connect = tokio_connect.map(|tokio_stream| {
Box::pin(TcpStream { tokio_stream }) as Pin<Box<dyn runtime_raw::TcpStream>>
});
connect.compat().boxed()
}

fn bind_tcp_listener(
&self,
addr: &SocketAddr,
) -> io::Result<Pin<Box<dyn runtime_raw::TcpListener>>> {
let tokio_listener = tokio::net::TcpListener::bind(&addr)?;
Ok(Box::pin(TcpListener { tokio_listener }))
}

fn bind_udp_socket(
&self,
addr: &SocketAddr,
) -> io::Result<Pin<Box<dyn runtime_raw::UdpSocket>>> {
let tokio_socket = tokio::net::UdpSocket::bind(&addr)?;
Ok(Box::pin(UdpSocket { tokio_socket }))
runtime_raw::enter_runtime(&TokioRuntime, || {
rt.block_on(fut.unit_error().boxed().compat()).unwrap()
})
}
}

fn new_delay(&self, dur: Duration) -> Pin<Box<dyn runtime_raw::Delay>> {
let tokio_delay = TokioDelay::new(Instant::now() + dur);
Box::pin(Delay { tokio_delay })
}
/// The single-threaded Tokio runtime based on `tokio-current-thread`.
#[derive(Debug)]
pub struct TokioCurrentThread;

fn new_delay_at(&self, at: Instant) -> Pin<Box<dyn runtime_raw::Delay>> {
let tokio_delay = TokioDelay::new(at);
Box::pin(Delay { tokio_delay })
}
impl<F, T> runtime_raw::BlockingRuntime<F, T> for TokioCurrentThread
where
F: Future<Output = T>,
{
fn block_on(&self, fut: F) -> T {
runtime_raw::enter_runtime(&TokioRuntime, move || {
let mut rt = tokio::runtime::current_thread::Runtime::new().unwrap();

fn new_interval(&self, dur: Duration) -> Pin<Box<dyn runtime_raw::Interval>> {
let tokio_interval = TokioInterval::new(Instant::now(), dur);
Box::pin(Interval { tokio_interval })
rt.block_on(fut.unit_error().boxed_local().compat()).unwrap()
})
}
}