Skip to content

ConcurrentStream usage with tokio leads to ACCESS_VIOLATION #2851

@inklesspen1rus

Description

@inklesspen1rus

Tried to use concurrent streams to sleep in parallel with tokio:

use core::time::Duration;
use futures_concurrency::prelude::*;
use tokio;
use futures::prelude::*;

#[tokio::main(flavor = "current_thread")]
async fn main() {
    let s1 = stream::iter([1, 2]);
    let s2 = stream::iter([1, 2]);
    let s3 = stream::iter([1, 2]);

    (s1, s2, s3)
        .chain()
        .co()
        .map(|x| async move {
            tokio::time::sleep(Duration::from_secs(x as _)).await;
            ()
        })
        .for_each(|_| async {})
        .await;
}

But sometimes I get crash:

> cargo run
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.15s
     Running `target\debug\rstestss.exe`
error: process didn't exit successfully: `target\debug\rstestss.exe` (exit code: 0xc0000005, STATUS_ACCESS_VIOLATION)

Without "current_thread" flavor program just freeze

Other runtimes work fine:
async_std

use core::time::Duration;
use async_std;
use futures_concurrency::prelude::*;
use futures::prelude::*;

#[async_std::main]
async fn main() {
    let s1 = stream::iter([1, 2]);
    let s2 = stream::iter([1, 2]);
    let s3 = stream::iter([1, 2]);

    (s1, s2, s3)
        .chain()
        .co()
        .map(|x| async move {
            async_std::task::sleep(Duration::from_secs(x)).await;
            ()
        })
        .for_each(|_| async {})
        .await;
}

smol

use core::time::Duration;
use futures_concurrency::prelude::*;
use futures::prelude::*;

async fn main_async() {
    let s1 = stream::iter([1, 2]);
    let s2 = stream::iter([1, 2]);
    let s3 = stream::iter([1, 2]);

    (s1, s2, s3)
        .chain()
        .co()
        .map(|x| async move {
            smol::Timer::after(Duration::from_secs(x)).await;
            ()
        })
        .for_each(|_| async {})
        .await;
}

fn main() {
    smol::block_on(main_async());
}

Also tokio runtime with smol Timer works fine:

use core::time::Duration;
use futures_concurrency::prelude::*;
use tokio;
use futures::prelude::*;

#[tokio::main(flavor = "current_thread")]
async fn main() {
    let s1 = stream::iter([1, 2]);
    let s2 = stream::iter([1, 2]);
    let s3 = stream::iter([1, 2]);

    (s1, s2, s3)
        .chain()
        .co()
        .map(|x| async move {
            // tokio::time::sleep(Duration::from_secs(x as _)).await;
            smol::Timer::after(Duration::from_secs(x)).await;
            ()
        })
        .for_each(|_| async {})
        .await;
}

Is that Tokio issue?

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions