Skip to content

_sock_connect_cb can be called twice resulting in InvalidStateError #633

@thehesiod

Description

@thehesiod

I'm posting this bug here as recommendation for Guido on the bug I logged there: http://bugs.python.org/issue25593. It seems like there's a logic error in terms of _sock_connect_cb whereby it can get called twice under certain circumstances. If I apply the simple fix to check if the future is already done I run into occasional connection errors.

Here is my simple testcase:

import asyncio
import aiohttp
import multiprocessing
import traceback
import http.server
import time

# Monkey patching
# http://bugs.python.org/issue25593
if False:
    import asyncio.selector_events
    orig_sock_connect_cb = asyncio.selector_events.BaseSelectorEventLoop._sock_connect_cb
    def _sock_connect_cb(self, fut, sock, address):
        if fut.done(): return
        return orig_sock_connect_cb(self, fut, sock, address)
    asyncio.selector_events.BaseSelectorEventLoop._sock_connect_cb = _sock_connect_cb


def process_worker(q):
    loop = asyncio.get_event_loop()
    connector = aiohttp.TCPConnector(force_close=False, keepalive_timeout=8, use_dns_cache=True)
    session = aiohttp.ClientSession(connector=connector)
    async_queue = asyncio.Queue(100)

    @asyncio.coroutine
    def async_worker(async_queue):

        while True:
            try:
                url = yield from async_queue.get()
                response = yield from session.request('GET', url)

                try:
                    data = yield from response.read()
                    print(data)
                finally:
                    yield from response.wait_for_close()
                    pass
            except:
                traceback.print_exc()

    def worker_done(f):
        try:
            f.result()
            print("worker exited")
        except:
            traceback.print_exc()

    workers = []
    for i in range(100):
        t = asyncio.ensure_future(async_worker(async_queue))
        t.add_done_callback(worker_done)
        workers.append(t)

    def producer(q):
        obj2 = q.get()
        return obj2

    @asyncio.coroutine
    def doit():
        obj = yield from loop.run_in_executor(None, producer, q)
        yield from async_queue.put(obj)

    while True:
        loop.run_until_complete(doit())


class GetHandler(http.server.SimpleHTTPRequestHandler):
    def do_GET(self):
        f = self.send_head()
        time.sleep(0.5)
        if f:
            try:
                self.copyfile(f, self.wfile)
            finally:
                f.close()


def run_server(server_class=http.server.HTTPServer, handler_class=GetHandler):
    server_address = ('0.0.0.0', 8080)
    httpd = server_class(server_address, handler_class)
    httpd.serve_forever()


if __name__ == '__main__':
    q = multiprocessing.Queue(100)

    p = multiprocessing.Process(target=process_worker, args=(q,))
    p.start()

    p2 = multiprocessing.Process(target=run_server)
    p2.start()

    while True:
        q.put("http://0.0.0.0:8080")

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions