Skip to content

try ... except ... not working properly when using asyncio for heavy concurrency #107285

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
MidCheck opened this issue Jul 26, 2023 · 2 comments
Closed
Labels
pending The issue will be closed if no feedback is provided topic-asyncio type-bug An unexpected behavior, bug, or error

Comments

@MidCheck
Copy link

MidCheck commented Jul 26, 2023

Bug report

When I use asyncio to write a port scanner, I connect the socket in the try statement, and use 'asyncio.timeout' and 'asyncio.wait_for' for timeout processing. When the concurrency reaches more than 5000, this problem occurs.

The main part of the code is as follows:

class Scan:
    def __init__(self, targets: List[str], ports: List[int]):
        self.ports = ports
        self.targets = targets
        self.loop = asyncio.get_running_loop()
        self.result_queue = asyncio.Queue()
        self.target_queue = asyncio.Queue()

    async def scan_target_port_worker(self, worker_name: str, timeout: float=1):
        while True:
            try:
                target, port = await self.target_queue.get()
                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                sock.setblocking(False)
                result = False
                while True:
                    try:
                        # await asyncio.wait_for(self.loop.sock_connect(sock, (target, port)), timeout)
                        async with asyncio.timeout(timeout):
                            await self.loop.sock_connect(sock, (target, port))
                        if target == '192.168.22.2' and port in (1, 22, 8080, 8081, 8082):
                            print(f"[{worker_name}][1] [{target},{port}] -> {sock.getpeername()}")
                    except asyncio.TimeoutError:
                        if timeout > 10:
                            print(f"[-] connect ({target}, {port}) timeout: {timeout}")
                            break
                        timeout += 0.5
                        continue
                    except (ConnectionRefusedError, ConnectionResetError, ConnectionAbortedError) as e:
                        if target == '192.168.22.2' and port in (1, 22, 8080, 8081, 8082):
                            print(f"[{worker_name}][2] [{target},{port}] -> {str(e)}")
                            raise
                        pass
                    except Exception as e:
                        if target == '192.168.22.2' and port in (1, 22, 8080, 8081, 8082):
                            print(f"[{worker_name}][3] [{target},{port}] -> {str(e)}")
                            raise
                        print(f"[-] connect ({target},{port}) errno: {e.errno}, error: {str(e)}")
                    else:
                        result = True
                        if target == '192.168.22.2' and port in (1, 22, 8080, 8081, 8082):
                            print(f"[{worker_name}][5] [{target},{port}] -> {result}")
                    finally:
                        if target == '192.168.22.2' and port in (1, 22, 8080, 8081, 8082):
                            print(f"[{worker_name}][6] [{target},{port},{timeout}] -> result: {result} ---> {sock}\n")
                        await self.result_queue.put((target, port, result, worker_name, timeout))
                        sock.close()
                        self.target_queue.task_done()
                        break
            except asyncio.CancelledError:
                break
            except Exception as e:
                print("[-] open tcp port other error:", str(e))
                break
    
    async def result_consumer(self, total: int):
        response = {}
        while True:
            try:
                host, port, is_open, work_name, timeout = await asyncio.wait_for(self.result_queue.get(), 5)
            except asyncio.TimeoutError:
                print(f"[-] wait result timeout, except {total} results")
                break
            except asyncio.CancelledError:
                break
            else:
                if is_open:
                    print(f"[consumer][{work_name}] [{host},{port},{timeout}] -> {is_open}")
                    if host not in response:
                        response[host] = []
                    response[host].append(port)
                total -= 1
                if total == 0:
                    break
                if (total % 10000) == 0:
                    print(f"[consumer] total:", total)
        return response
    
    async def scan_port(self, workers: int=5000):
        start_time = time.time()
        tasks = [asyncio.create_task(self.scan_target_port_worker(worker_name=f"worker {i}")) for i in range(workers)]
        total= len(self.targets) * len(self.ports)
        consumer_task = asyncio.create_task(self.result_consumer(total))

        for target in self.targets:
            for port in self.ports:
                self.target_queue.put_nowait((target, port))
        print("[.] all targets are to queue, total: %d size: %d" % (total, self.target_queue.qsize()))
        await self.target_queue.join()
        print("[.] target join finish")
        try:
            await consumer_task
        except Exception as e:
            print("[-] await consumer error:", str(e))
            raise
        else:
            response = consumer_task.result()
            if not consumer_task.cancelled():
                consumer_task.cancel()
        
        for task in tasks:
            task.cancel()

        count = int(time.time() - start_time)
        print(f"[.] total {count} seconds")
        return response

This code is called like this 'Scan(['192.168.22.2', '192.168.179.30'], [1-65535]).scan_port() ', 'targets' are two LAN IPs, 'ports' are all ports 1-65535.

When using asyncio.timeout, the output is:

[.] all targets are to queue, total: 131068 size: 131068
[worker 0][6] [192.168.22.2,1,1.5] -> result: False ---> <socket.socket fd=15, family=2, type=1, proto=0, laddr=('192.168.21.253', 39464)>

[worker 21][6] [192.168.22.2,22,1.5] -> result: False ---> <socket.socket fd=36, family=2, type=1, proto=0, laddr=('192.168.21.253', 57586), raddr=('192.168.22.2', 22)>

[worker 3079][1] [192.168.22.2,8080] -> ('192.168.22.2', 8080)
[worker 3079][5] [192.168.22.2,8080] -> True
[worker 3079][6] [192.168.22.2,8080,1] -> result: True ---> <socket.socket fd=3094, family=2, type=1, proto=0, laddr=('192.168.21.253', 52826), raddr=('192.168.22.2', 8080)>

[worker 3080][1] [192.168.22.2,8081] -> ('192.168.22.2', 8081)
[worker 3080][5] [192.168.22.2,8081] -> True
[worker 3080][6] [192.168.22.2,8081,1] -> result: True ---> <socket.socket fd=3095, family=2, type=1, proto=0, laddr=('192.168.21.253', 58416), raddr=('192.168.22.2', 8081)>

[worker 3081][1] [192.168.22.2,8082] -> ('192.168.22.2', 8082)
[worker 3081][5] [192.168.22.2,8082] -> True
[worker 3081][6] [192.168.22.2,8082,1] -> result: True ---> <socket.socket fd=3096, family=2, type=1, proto=0, laddr=('192.168.21.253', 51796), raddr=('192.168.22.2', 8082)>

[.] target join finish
[consumer] total: 130000
…
[consumer][worker 3079] [192.168.22.2,8080,1] -> True
[consumer][worker 3080] [192.168.22.2,8081,1] -> True
[consumer][worker 3081] [192.168.22.2,8082,1] -> True
…
[consumer] all target has be consumer
[.] total 24 seconds

It can be seen that the connection to port 22 is successful, no exception is triggered, but the else statement is not executed.

When using asyncio.wait_for, also have this problem, just the port is different.

I can avoid this problem by adjusting the number of workers and the value of timeout, but this should be a bug.

I'm guessing the problem might be related to this. #96037

Your environment

This problem occurs on both Ubuntu and MacOS, not tested for windows.

Ubuntu:

Python 3.11.2 (main, May 30 2023, 17:45:26) [GCC 12.2.0] on linux
Ubuntu 23.04 x86_64
CPU: 12th Gen Intel(R) Core(TM) i5-12450H

MacOS:

Python 3.11.3 (main, Apr  7 2023, 20:13:31) [Clang 14.0.0 (clang-1400.0.29.202)] on darwin
macOS Ventura 13.4.1 (c)
CPU: Apple M2 Max
@MidCheck MidCheck added the type-bug An unexpected behavior, bug, or error label Jul 26, 2023
@github-project-automation github-project-automation bot moved this to Todo in asyncio Jul 26, 2023
@gvanrossum
Copy link
Member

I'm sorry, but such a complex repro is not suitable for reporting a bug. Please simplify the problem to something more reasonable.

@gvanrossum gvanrossum added the pending The issue will be closed if no feedback is provided label Aug 9, 2023
@MidCheck
Copy link
Author

I'm sorry, but such a complex repro is not suitable for reporting a bug. Please simplify the problem to something more reasonable.

Ok, I'll dig into this in my spare time and see if I can figure out why.

@iritkatriel iritkatriel closed this as not planned Won't fix, can't repro, duplicate, stale Oct 26, 2023
@github-project-automation github-project-automation bot moved this from Todo to Done in asyncio Oct 26, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
pending The issue will be closed if no feedback is provided topic-asyncio type-bug An unexpected behavior, bug, or error
Projects
Status: Done
Development

No branches or pull requests

4 participants