Description
Bug report
When I use asyncio to write a port scanner, I connect the socket in the try statement, and use 'asyncio.timeout' and 'asyncio.wait_for' for timeout processing. When the concurrency reaches more than 5000, this problem occurs.
The main part of the code is as follows:
class Scan:
def __init__(self, targets: List[str], ports: List[int]):
self.ports = ports
self.targets = targets
self.loop = asyncio.get_running_loop()
self.result_queue = asyncio.Queue()
self.target_queue = asyncio.Queue()
async def scan_target_port_worker(self, worker_name: str, timeout: float=1):
while True:
try:
target, port = await self.target_queue.get()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(False)
result = False
while True:
try:
# await asyncio.wait_for(self.loop.sock_connect(sock, (target, port)), timeout)
async with asyncio.timeout(timeout):
await self.loop.sock_connect(sock, (target, port))
if target == '192.168.22.2' and port in (1, 22, 8080, 8081, 8082):
print(f"[{worker_name}][1] [{target},{port}] -> {sock.getpeername()}")
except asyncio.TimeoutError:
if timeout > 10:
print(f"[-] connect ({target}, {port}) timeout: {timeout}")
break
timeout += 0.5
continue
except (ConnectionRefusedError, ConnectionResetError, ConnectionAbortedError) as e:
if target == '192.168.22.2' and port in (1, 22, 8080, 8081, 8082):
print(f"[{worker_name}][2] [{target},{port}] -> {str(e)}")
raise
pass
except Exception as e:
if target == '192.168.22.2' and port in (1, 22, 8080, 8081, 8082):
print(f"[{worker_name}][3] [{target},{port}] -> {str(e)}")
raise
print(f"[-] connect ({target},{port}) errno: {e.errno}, error: {str(e)}")
else:
result = True
if target == '192.168.22.2' and port in (1, 22, 8080, 8081, 8082):
print(f"[{worker_name}][5] [{target},{port}] -> {result}")
finally:
if target == '192.168.22.2' and port in (1, 22, 8080, 8081, 8082):
print(f"[{worker_name}][6] [{target},{port},{timeout}] -> result: {result} ---> {sock}\n")
await self.result_queue.put((target, port, result, worker_name, timeout))
sock.close()
self.target_queue.task_done()
break
except asyncio.CancelledError:
break
except Exception as e:
print("[-] open tcp port other error:", str(e))
break
async def result_consumer(self, total: int):
response = {}
while True:
try:
host, port, is_open, work_name, timeout = await asyncio.wait_for(self.result_queue.get(), 5)
except asyncio.TimeoutError:
print(f"[-] wait result timeout, except {total} results")
break
except asyncio.CancelledError:
break
else:
if is_open:
print(f"[consumer][{work_name}] [{host},{port},{timeout}] -> {is_open}")
if host not in response:
response[host] = []
response[host].append(port)
total -= 1
if total == 0:
break
if (total % 10000) == 0:
print(f"[consumer] total:", total)
return response
async def scan_port(self, workers: int=5000):
start_time = time.time()
tasks = [asyncio.create_task(self.scan_target_port_worker(worker_name=f"worker {i}")) for i in range(workers)]
total= len(self.targets) * len(self.ports)
consumer_task = asyncio.create_task(self.result_consumer(total))
for target in self.targets:
for port in self.ports:
self.target_queue.put_nowait((target, port))
print("[.] all targets are to queue, total: %d size: %d" % (total, self.target_queue.qsize()))
await self.target_queue.join()
print("[.] target join finish")
try:
await consumer_task
except Exception as e:
print("[-] await consumer error:", str(e))
raise
else:
response = consumer_task.result()
if not consumer_task.cancelled():
consumer_task.cancel()
for task in tasks:
task.cancel()
count = int(time.time() - start_time)
print(f"[.] total {count} seconds")
return response
This code is called like this 'Scan(['192.168.22.2', '192.168.179.30'], [1-65535]).scan_port() ', 'targets' are two LAN IPs, 'ports' are all ports 1-65535.
When using asyncio.timeout, the output is:
[.] all targets are to queue, total: 131068 size: 131068
[worker 0][6] [192.168.22.2,1,1.5] -> result: False ---> <socket.socket fd=15, family=2, type=1, proto=0, laddr=('192.168.21.253', 39464)>
[worker 21][6] [192.168.22.2,22,1.5] -> result: False ---> <socket.socket fd=36, family=2, type=1, proto=0, laddr=('192.168.21.253', 57586), raddr=('192.168.22.2', 22)>
[worker 3079][1] [192.168.22.2,8080] -> ('192.168.22.2', 8080)
[worker 3079][5] [192.168.22.2,8080] -> True
[worker 3079][6] [192.168.22.2,8080,1] -> result: True ---> <socket.socket fd=3094, family=2, type=1, proto=0, laddr=('192.168.21.253', 52826), raddr=('192.168.22.2', 8080)>
[worker 3080][1] [192.168.22.2,8081] -> ('192.168.22.2', 8081)
[worker 3080][5] [192.168.22.2,8081] -> True
[worker 3080][6] [192.168.22.2,8081,1] -> result: True ---> <socket.socket fd=3095, family=2, type=1, proto=0, laddr=('192.168.21.253', 58416), raddr=('192.168.22.2', 8081)>
[worker 3081][1] [192.168.22.2,8082] -> ('192.168.22.2', 8082)
[worker 3081][5] [192.168.22.2,8082] -> True
[worker 3081][6] [192.168.22.2,8082,1] -> result: True ---> <socket.socket fd=3096, family=2, type=1, proto=0, laddr=('192.168.21.253', 51796), raddr=('192.168.22.2', 8082)>
[.] target join finish
[consumer] total: 130000
…
[consumer][worker 3079] [192.168.22.2,8080,1] -> True
[consumer][worker 3080] [192.168.22.2,8081,1] -> True
[consumer][worker 3081] [192.168.22.2,8082,1] -> True
…
[consumer] all target has be consumer
[.] total 24 seconds
It can be seen that the connection to port 22 is successful, no exception is triggered, but the else statement is not executed.
When using asyncio.wait_for, also have this problem, just the port is different.
I can avoid this problem by adjusting the number of workers and the value of timeout, but this should be a bug.
I'm guessing the problem might be related to this. #96037
Your environment
This problem occurs on both Ubuntu and MacOS, not tested for windows.
Ubuntu:
Python 3.11.2 (main, May 30 2023, 17:45:26) [GCC 12.2.0] on linux
Ubuntu 23.04 x86_64
CPU: 12th Gen Intel(R) Core(TM) i5-12450H
MacOS:
Python 3.11.3 (main, Apr 7 2023, 20:13:31) [Clang 14.0.0 (clang-1400.0.29.202)] on darwin
macOS Ventura 13.4.1 (c)
CPU: Apple M2 Max
Metadata
Metadata
Assignees
Projects
Status