Skip to content

tokio_timer::Interval with compat layer is too fast #1285

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
realcr opened this issue Oct 11, 2018 · 12 comments
Closed

tokio_timer::Interval with compat layer is too fast #1285

realcr opened this issue Oct 11, 2018 · 12 comments

Comments

@realcr
Copy link

realcr commented Oct 11, 2018

Hi, I tried to tokio_timer::Interval with the compat layer of Futures 3.0. It seems like it produces ticks very fast, no matter which duration I give.

Example code:

use std::time::{Duration, Instant};
use futures::future;
use futures::stream::StreamExt;
use futures::executor::LocalPool;
use tokio_timer::Interval;
use futures_util::compat::{Stream01CompatExt};

fn main() {
    let mut local_pool = LocalPool::new();

    let dur = Duration::new(1,0);
    let interval = Interval::new(Instant::now(), dur)
        .compat()
        .map(|_| ());

    let my_fut = interval.for_each(|_| {
        println!("Interval tick!");
        future::ready(())
    });
    local_pool.run_until(my_fut);
}

Expected behaviour: "Interval tick!" will be printed about every second.
Observed behaviour: "Interval tick!" is printed very quickly. Changing the numbers given to Duration::new() does not seem to change the behaviour.

My Cargo.toml:

[package]
name = "check_interval_compat"
version = "0.1.0"
authors = ["real"]
edition = "2018"

[dependencies]

futures-preview = { git = "https://github.com/rust-lang-nursery/futures-rs", rev = "a25e51e" }
futures-util-preview = { git = "https://github.com/rust-lang-nursery/futures-rs", rev = "a25e51e", features = ["compat"] }
tokio-timer = "0.2.7"

rustc version:

$ rustc --version
rustc 1.31.0-nightly (5af0bb830 2018-10-10)

Your work on Futures is highly appreciated!
Thank you for your help!

@Matthias247
Copy link
Contributor

Matthias247 commented Oct 12, 2018

Afaik Tokio futures only run on a tokio executor and not yet on other executors. The example however tries to drive the future from a futures LocalPool executor. I guess that might be the reason that it doesn't work correctly.

@realcr
Copy link
Author

realcr commented Oct 12, 2018

@Matthias247 : Thanks for you reply! I didn't know Tokio has a separate executor. I am working with Futures for over a year now, but the distinction between what is Tokio and what is Futures is still very confusing to me.

Do you think there still any hope to do what I'm trying to do: Using Interval with the rest of my Futures 0.3 code? If so, what is your recommendation?

@Nemo157
Copy link
Member

Nemo157 commented Oct 12, 2018

I was confused that you weren't getting an error about not being able to register the timeout with Tokio's reactor timer system, taking a look at the code it seems like it should correctly handle this. Here's a small diff of your program that will show the issue:

diff --git src/main.rs src/main.rs
index 846d142..9c7ab60 100644
--- src/main.rs
+++ src/main.rs
@@ -10,11 +10,10 @@ fn main() {

     let dur = Duration::new(1,0);
     let interval = Interval::new(Instant::now(), dur)
-        .compat()
-        .map(|_| ());
+        .compat();

-    let my_fut = interval.for_each(|_| {
-        println!("Interval tick!");
+    let my_fut = interval.for_each(|r| {
+        println!("Interval tick!: {:?}", r);
         future::ready(())
     });

gives

Interval tick!: Err(Error(Shutdown))

@realcr
Copy link
Author

realcr commented Oct 12, 2018

@Nemo157 I see, the map() hid the errors!
In that case, what do you think would be the correct way to use an Interval Stream?

@realcr
Copy link
Author

realcr commented Oct 12, 2018

This is my current hack:

use std::thread;
use std::time::{Duration, Instant};
use futures::future;
use futures::stream::StreamExt;
use futures::sink::{SinkExt};
use futures::executor::LocalPool;
use futures::channel::mpsc;
use tokio::prelude::{Sink as TokioSink, Stream as TokioStream, Future as TokioFuture};
use tokio::timer::Interval;


fn interval_thread(sender: mpsc::Sender<()>) {
    let dur = Duration::new(1,0);
    let interval = Interval::new(Instant::now(), dur)
        .map_err(|_| ());
    let my_tokio_fut = interval.for_each(move |t| {
        println!("Sending tick! {:?}", t);
        let c_sender = sender.clone().compat();
        c_sender.send(())
            .map_err(|_| ())
            .map(|_| ())
    });
    tokio::run(my_tokio_fut);
}

fn main() {
    let mut local_pool = LocalPool::new();

    let (sender, receiver) = mpsc::channel::<()>(0);
    thread::spawn(|| interval_thread(sender));

    let my_fut = receiver.for_each(|_| {
        println!("Interval tick!");
        future::ready(())
    });

    local_pool.run_until(my_fut);
}

It seems to work, but I had to spawn a new thread.

@Nemo157
Copy link
Member

Nemo157 commented Oct 15, 2018

In that case, what do you think would be the correct way to use an Interval Stream?

The correct way is to use the Tokio runtime as your executor to setup the global timer instance (or manually set it up via tokio_timer::with_default in a combinator yourself). Using the branch from #1286 to avoid the issue in #1283 this works:

#![feature(await_macro, async_await, futures_api)]

use tokio::timer::Interval;
use std::time::{Duration, Instant};
use futures::{FutureExt, StreamExt, TryFutureExt};
use futures::compat::Stream01CompatExt;

fn main() {
    let dur = Duration::new(1,0);
    let interval = Interval::new(Instant::now(), dur).compat();

    let my_fut = interval.for_each(async move |a| {
        println!("Interval tick!: {:?}", a);
    });
    tokio::run(my_fut.unit_error().boxed().compat());
}

@cramertj
Copy link
Member

Closing this since the issue is that tokio::timer::Interval is being used with something other than the tokio runtime. Feel free to comment if you feel like this was closed improperly.

@kigawas
Copy link

kigawas commented Oct 11, 2019

How to use tokio 0.2's interval in tokio 0.1's runtime? It panicked at 'timer error: timer is shutdown'

I have a 0.3 future using 0.2's interval, but my runtime is 0.1's runtime, and I use compat module to convert the 0.3 future to 0.1 future, and it panicked.

@kigawas
Copy link

kigawas commented Oct 15, 2019

How to use tokio 0.2's interval in tokio 0.1's runtime? It panicked at 'timer error: timer is shutdown'

I have a 0.3 future using 0.2's interval, but my runtime is 0.1's runtime, and I use compat module to convert the 0.3 future to 0.1 future, and it panicked.

OK I see, just use a 0.1's interval and compat it as 0.3's stream to run in 0.1's runtime

@LucioFranco
Copy link
Member

correct

@kigawas
Copy link

kigawas commented Oct 16, 2019

correct

But do you know the root cause of this? I read some codes but didn't get it why it'd happen like this

@LucioFranco
Copy link
Member

Because we use thread locals to track the current timer instance to attach the delay too. Because of how rust works when it attempts to fetch the thread local from a 0.2 runtime with a 0.1 library it will be unable to find this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants