Skip to content

try ... except ... not working properly when using asyncio for heavy concurrency #107285

Closed as not planned
@MidCheck

Description

@MidCheck

Bug report

When I use asyncio to write a port scanner, I connect the socket in the try statement, and use 'asyncio.timeout' and 'asyncio.wait_for' for timeout processing. When the concurrency reaches more than 5000, this problem occurs.

The main part of the code is as follows:

class Scan:
    def __init__(self, targets: List[str], ports: List[int]):
        self.ports = ports
        self.targets = targets
        self.loop = asyncio.get_running_loop()
        self.result_queue = asyncio.Queue()
        self.target_queue = asyncio.Queue()

    async def scan_target_port_worker(self, worker_name: str, timeout: float=1):
        while True:
            try:
                target, port = await self.target_queue.get()
                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                sock.setblocking(False)
                result = False
                while True:
                    try:
                        # await asyncio.wait_for(self.loop.sock_connect(sock, (target, port)), timeout)
                        async with asyncio.timeout(timeout):
                            await self.loop.sock_connect(sock, (target, port))
                        if target == '192.168.22.2' and port in (1, 22, 8080, 8081, 8082):
                            print(f"[{worker_name}][1] [{target},{port}] -> {sock.getpeername()}")
                    except asyncio.TimeoutError:
                        if timeout > 10:
                            print(f"[-] connect ({target}, {port}) timeout: {timeout}")
                            break
                        timeout += 0.5
                        continue
                    except (ConnectionRefusedError, ConnectionResetError, ConnectionAbortedError) as e:
                        if target == '192.168.22.2' and port in (1, 22, 8080, 8081, 8082):
                            print(f"[{worker_name}][2] [{target},{port}] -> {str(e)}")
                            raise
                        pass
                    except Exception as e:
                        if target == '192.168.22.2' and port in (1, 22, 8080, 8081, 8082):
                            print(f"[{worker_name}][3] [{target},{port}] -> {str(e)}")
                            raise
                        print(f"[-] connect ({target},{port}) errno: {e.errno}, error: {str(e)}")
                    else:
                        result = True
                        if target == '192.168.22.2' and port in (1, 22, 8080, 8081, 8082):
                            print(f"[{worker_name}][5] [{target},{port}] -> {result}")
                    finally:
                        if target == '192.168.22.2' and port in (1, 22, 8080, 8081, 8082):
                            print(f"[{worker_name}][6] [{target},{port},{timeout}] -> result: {result} ---> {sock}\n")
                        await self.result_queue.put((target, port, result, worker_name, timeout))
                        sock.close()
                        self.target_queue.task_done()
                        break
            except asyncio.CancelledError:
                break
            except Exception as e:
                print("[-] open tcp port other error:", str(e))
                break
    
    async def result_consumer(self, total: int):
        response = {}
        while True:
            try:
                host, port, is_open, work_name, timeout = await asyncio.wait_for(self.result_queue.get(), 5)
            except asyncio.TimeoutError:
                print(f"[-] wait result timeout, except {total} results")
                break
            except asyncio.CancelledError:
                break
            else:
                if is_open:
                    print(f"[consumer][{work_name}] [{host},{port},{timeout}] -> {is_open}")
                    if host not in response:
                        response[host] = []
                    response[host].append(port)
                total -= 1
                if total == 0:
                    break
                if (total % 10000) == 0:
                    print(f"[consumer] total:", total)
        return response
    
    async def scan_port(self, workers: int=5000):
        start_time = time.time()
        tasks = [asyncio.create_task(self.scan_target_port_worker(worker_name=f"worker {i}")) for i in range(workers)]
        total= len(self.targets) * len(self.ports)
        consumer_task = asyncio.create_task(self.result_consumer(total))

        for target in self.targets:
            for port in self.ports:
                self.target_queue.put_nowait((target, port))
        print("[.] all targets are to queue, total: %d size: %d" % (total, self.target_queue.qsize()))
        await self.target_queue.join()
        print("[.] target join finish")
        try:
            await consumer_task
        except Exception as e:
            print("[-] await consumer error:", str(e))
            raise
        else:
            response = consumer_task.result()
            if not consumer_task.cancelled():
                consumer_task.cancel()
        
        for task in tasks:
            task.cancel()

        count = int(time.time() - start_time)
        print(f"[.] total {count} seconds")
        return response

This code is called like this 'Scan(['192.168.22.2', '192.168.179.30'], [1-65535]).scan_port() ', 'targets' are two LAN IPs, 'ports' are all ports 1-65535.

When using asyncio.timeout, the output is:

[.] all targets are to queue, total: 131068 size: 131068
[worker 0][6] [192.168.22.2,1,1.5] -> result: False ---> <socket.socket fd=15, family=2, type=1, proto=0, laddr=('192.168.21.253', 39464)>

[worker 21][6] [192.168.22.2,22,1.5] -> result: False ---> <socket.socket fd=36, family=2, type=1, proto=0, laddr=('192.168.21.253', 57586), raddr=('192.168.22.2', 22)>

[worker 3079][1] [192.168.22.2,8080] -> ('192.168.22.2', 8080)
[worker 3079][5] [192.168.22.2,8080] -> True
[worker 3079][6] [192.168.22.2,8080,1] -> result: True ---> <socket.socket fd=3094, family=2, type=1, proto=0, laddr=('192.168.21.253', 52826), raddr=('192.168.22.2', 8080)>

[worker 3080][1] [192.168.22.2,8081] -> ('192.168.22.2', 8081)
[worker 3080][5] [192.168.22.2,8081] -> True
[worker 3080][6] [192.168.22.2,8081,1] -> result: True ---> <socket.socket fd=3095, family=2, type=1, proto=0, laddr=('192.168.21.253', 58416), raddr=('192.168.22.2', 8081)>

[worker 3081][1] [192.168.22.2,8082] -> ('192.168.22.2', 8082)
[worker 3081][5] [192.168.22.2,8082] -> True
[worker 3081][6] [192.168.22.2,8082,1] -> result: True ---> <socket.socket fd=3096, family=2, type=1, proto=0, laddr=('192.168.21.253', 51796), raddr=('192.168.22.2', 8082)>

[.] target join finish
[consumer] total: 130000
…
[consumer][worker 3079] [192.168.22.2,8080,1] -> True
[consumer][worker 3080] [192.168.22.2,8081,1] -> True
[consumer][worker 3081] [192.168.22.2,8082,1] -> True
…
[consumer] all target has be consumer
[.] total 24 seconds

It can be seen that the connection to port 22 is successful, no exception is triggered, but the else statement is not executed.

When using asyncio.wait_for, also have this problem, just the port is different.

I can avoid this problem by adjusting the number of workers and the value of timeout, but this should be a bug.

I'm guessing the problem might be related to this. #96037

Your environment

This problem occurs on both Ubuntu and MacOS, not tested for windows.

Ubuntu:

Python 3.11.2 (main, May 30 2023, 17:45:26) [GCC 12.2.0] on linux
Ubuntu 23.04 x86_64
CPU: 12th Gen Intel(R) Core(TM) i5-12450H

MacOS:

Python 3.11.3 (main, Apr  7 2023, 20:13:31) [Clang 14.0.0 (clang-1400.0.29.202)] on darwin
macOS Ventura 13.4.1 (c)
CPU: Apple M2 Max

Metadata

Metadata

Assignees

No one assigned

    Labels

    pendingThe issue will be closed if no feedback is providedtopic-asynciotype-bugAn unexpected behavior, bug, or error

    Projects

    Status

    Done

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions