Skip to content

test_as_completed_async_for_cancel sporadic failure  #3498

@jrbourbeau

Description

@jrbourbeau

https://travis-ci.org/dask/distributed/jobs/652202276#L1153-L1274

=================================== FAILURES ===================================

______________________ test_as_completed_async_for_cancel ______________________

    def test_func():

        result = None

        workers = []

        with clean(timeout=active_rpc_timeout, **clean_kwargs) as loop:

    

            async def coro():

                with dask.config.set(config):

                    s = False

                    for i in range(5):

                        try:

                            s, ws = await start_cluster(

                                nthreads,

                                scheduler,

                                loop,

                                security=security,

                                Worker=Worker,

                                scheduler_kwargs=scheduler_kwargs,

                                worker_kwargs=worker_kwargs,

                            )

                        except Exception as e:

                            logger.error(

                                "Failed to start gen_cluster, retrying",

                                exc_info=True,

                            )

                        else:

                            workers[:] = ws

                            args = [s] + workers

                            break

                    if s is False:

                        raise Exception("Could not start cluster")

                    if client:

                        c = await Client(

                            s.address,

                            loop=loop,

                            security=security,

                            asynchronous=True,

                            **client_kwargs

                        )

                        args = [c] + args

                    try:

                        future = func(*args)

                        if timeout:

                            future = asyncio.wait_for(future, timeout)

                        result = await future

                        if s.validate:

                            s.validate_state()

                    finally:

                        if client and c.status not in ("closing", "closed"):

                            await c._close(fast=s.status == "closed")

                        await end_cluster(s, workers)

                        await asyncio.wait_for(cleanup_global_workers(), 1)

    

                    try:

                        c = await default_client()

                    except ValueError:

                        pass

                    else:

                        await c._close(fast=True)

    

                    for i in range(5):

                        if all(c.closed() for c in Comm._instances):

                            break

                        else:

                            await asyncio.sleep(0.05)

                    else:

                        L = [c for c in Comm._instances if not c.closed()]

                        Comm._instances.clear()

                        # raise ValueError("Unclosed Comms", L)

                        print("Unclosed Comms", L)

    

                    return result

    

            result = loop.run_sync(

>               coro, timeout=timeout * 2 if timeout else timeout

            )

distributed/utils_test.py:957: 

_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

../../../miniconda/envs/test-environment/lib/python3.7/site-packages/tornado/ioloop.py:532: in run_sync

    return future_cell[0].result()

distributed/utils_test.py:927: in coro

    result = await future

../../../miniconda/envs/test-environment/lib/python3.7/asyncio/tasks.py:442: in wait_for

    return fut.result()

../../../miniconda/envs/test-environment/lib/python3.7/site-packages/tornado/gen.py:748: in run

    yielded = self.gen.send(value)

_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

c = <Client: not connected>

s = <Scheduler: "tcp://127.0.0.1:39668" processes: 0 cores: 0>

a = <Worker: 'tcp://127.0.0.1:32852', 0, closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>

b = <Worker: 'tcp://127.0.0.1:39460', 1, closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(client=True)

    def test_as_completed_async_for_cancel(c, s, a, b):

        x = c.submit(inc, 1)

        y = c.submit(sleep, 0.3)

        ac = as_completed([x, y])

    

        async def _():

            await gen.sleep(0.1)

            await y.cancel(asynchronous=True)

    

        c.loop.add_callback(_)

    

        L = []

    

        async def f():

            async for future in ac:

                L.append(future)

    

        yield f()

    

>       assert L == [x, y]

E       assert [<Future: can...685fba467442>] == [<Future: fin...23daed195134>]

E         At index 0 diff: <Future: cancelled, key: sleep-db9bdb7c8c76baa8f32b23daed195134> != <Future: finished, type: builtins.int, key: inc-aa226ae3a9f799819e1e685fba467442>

E         Use -v to get the full diff

distributed/tests/test_client.py:5800: AssertionError

Metadata

Metadata

Assignees

No one assigned

    Labels

    flaky testIntermittent failures on CI.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions