Skip to content

Commit 93856d4

Browse files
committed
asyncio: sync with Tulip
* Tulip issue #183: log socket events in debug mode - Log most important socket events: socket connected, new client, connection reset or closed by peer (EOF), etc. - Log time elapsed in DNS resolution (getaddrinfo) - Log pause/resume reading - Log time of SSL handshake - Log SSL handshake errors - Add a __repr__() method to many classes * Fix ProactorEventLoop() in debug mode. ProactorEventLoop._make_self_pipe() doesn't call call_soon() directly because it checks for the current loop which fails, because the method is called to build the event loop. * Cleanup _ProactorReadPipeTransport constructor. Not need to set again _read_fut attribute to None, it is already done in the base class.
1 parent b10d23a commit 93856d4

File tree

7 files changed

+219
-29
lines changed

7 files changed

+219
-29
lines changed

Lib/asyncio/base_events.py

Lines changed: 52 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,9 @@ def __init__(self, loop, sockets):
9494
self._active_count = 0
9595
self._waiters = []
9696

97+
def __repr__(self):
98+
return '<%s sockets=%r>' % (self.__class__.__name__, self.sockets)
99+
97100
def _attach(self):
98101
assert self.sockets is not None
99102
self._active_count += 1
@@ -110,8 +113,6 @@ def close(self):
110113
return
111114
self.sockets = None
112115
for sock in sockets:
113-
# closing sockets will call asynchronously the _detach() method
114-
# which calls _wakeup() for the last socket
115116
self._loop._stop_serving(sock)
116117
if self._active_count == 0:
117118
self._wakeup()
@@ -276,6 +277,8 @@ def close(self):
276277
raise RuntimeError("cannot close a running event loop")
277278
if self._closed:
278279
return
280+
if self._debug:
281+
logger.debug("Close %r", self)
279282
self._closed = True
280283
self._ready.clear()
281284
self._scheduled.clear()
@@ -402,10 +405,39 @@ def run_in_executor(self, executor, callback, *args):
402405
def set_default_executor(self, executor):
403406
self._default_executor = executor
404407

408+
def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
409+
msg = ["%s:%r" % (host, port)]
410+
if family:
411+
msg.append('family=%r' % family)
412+
if type:
413+
msg.append('type=%r' % type)
414+
if proto:
415+
msg.append('proto=%r' % proto)
416+
if flags:
417+
msg.append('flags=%r' % flags)
418+
msg = ', '.join(msg)
419+
logger.debug('Get addresss info %s', msg)
420+
421+
t0 = self.time()
422+
addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
423+
dt = self.time() - t0
424+
425+
msg = ('Getting addresss info %s took %.3f ms: %r'
426+
% (msg, dt * 1e3, addrinfo))
427+
if dt >= self.slow_callback_duration:
428+
logger.info(msg)
429+
else:
430+
logger.debug(msg)
431+
return addrinfo
432+
405433
def getaddrinfo(self, host, port, *,
406434
family=0, type=0, proto=0, flags=0):
407-
return self.run_in_executor(None, socket.getaddrinfo,
408-
host, port, family, type, proto, flags)
435+
if self._debug:
436+
return self.run_in_executor(None, self._getaddrinfo_debug,
437+
host, port, family, type, proto, flags)
438+
else:
439+
return self.run_in_executor(None, socket.getaddrinfo,
440+
host, port, family, type, proto, flags)
409441

410442
def getnameinfo(self, sockaddr, flags=0):
411443
return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
@@ -492,6 +524,8 @@ def create_connection(self, protocol_factory, host=None, port=None, *,
492524
sock.close()
493525
sock = None
494526
continue
527+
if self._debug:
528+
logger.debug("connect %r to %r", sock, address)
495529
yield from self.sock_connect(sock, address)
496530
except OSError as exc:
497531
if sock is not None:
@@ -524,6 +558,9 @@ def create_connection(self, protocol_factory, host=None, port=None, *,
524558

525559
transport, protocol = yield from self._create_connection_transport(
526560
sock, protocol_factory, ssl, server_hostname)
561+
if self._debug:
562+
logger.debug("connected to %s:%r: (%r, %r)",
563+
host, port, transport, protocol)
527564
return transport, protocol
528565

529566
@coroutine
@@ -614,6 +651,15 @@ def create_datagram_endpoint(self, protocol_factory,
614651
waiter = futures.Future(loop=self)
615652
transport = self._make_datagram_transport(sock, protocol, r_addr,
616653
waiter)
654+
if self._debug:
655+
if local_addr:
656+
logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
657+
"created: (%r, %r)",
658+
local_addr, remote_addr, transport, protocol)
659+
else:
660+
logger.debug("Datagram endpoint remote_addr=%r created: "
661+
"(%r, %r)",
662+
remote_addr, transport, protocol)
617663
yield from waiter
618664
return transport, protocol
619665

@@ -694,6 +740,8 @@ def create_server(self, protocol_factory, host=None, port=None,
694740
sock.listen(backlog)
695741
sock.setblocking(False)
696742
self._start_serving(protocol_factory, sock, ssl, server)
743+
if self._debug:
744+
logger.info("%r is serving", server)
697745
return server
698746

699747
@coroutine

Lib/asyncio/proactor_events.py

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,23 @@ def __init__(self, loop, sock, protocol, waiter=None,
4141
# wait until protocol.connection_made() has been called
4242
self._loop.call_soon(waiter._set_result_unless_cancelled, None)
4343

44+
def __repr__(self):
45+
info = [self.__class__.__name__, 'fd=%s' % self._sock.fileno()]
46+
if self._read_fut is not None:
47+
ov = "pending" if self._read_fut.ov.pending else "completed"
48+
info.append('read=%s' % ov)
49+
if self._write_fut is not None:
50+
if self._write_fut.ov.pending:
51+
info.append("write=pending=%s" % self._pending_write)
52+
else:
53+
info.append("write=completed")
54+
if self._buffer:
55+
bufsize = len(self._buffer)
56+
info.append('write_bufsize=%s' % bufsize)
57+
if self._eof_written:
58+
info.append('EOF written')
59+
return '<%s>' % ' '.join(info)
60+
4461
def _set_extra(self, sock):
4562
self._extra['pipe'] = sock
4663

@@ -55,7 +72,10 @@ def close(self):
5572
self._read_fut.cancel()
5673

5774
def _fatal_error(self, exc, message='Fatal error on pipe transport'):
58-
if not isinstance(exc, (BrokenPipeError, ConnectionResetError)):
75+
if isinstance(exc, (BrokenPipeError, ConnectionResetError)):
76+
if self._loop.get_debug():
77+
logger.debug("%r: %s", self, message, exc_info=True)
78+
else:
5979
self._loop.call_exception_handler({
6080
'message': message,
6181
'exception': exc,
@@ -108,7 +128,6 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
108128
def __init__(self, loop, sock, protocol, waiter=None,
109129
extra=None, server=None):
110130
super().__init__(loop, sock, protocol, waiter, extra, server)
111-
self._read_fut = None
112131
self._paused = False
113132
self._loop.call_soon(self._loop_reading)
114133

@@ -118,6 +137,8 @@ def pause_reading(self):
118137
if self._paused:
119138
raise RuntimeError('Already paused')
120139
self._paused = True
140+
if self._loop.get_debug():
141+
logger.debug("%r pauses reading", self)
121142

122143
def resume_reading(self):
123144
if not self._paused:
@@ -126,6 +147,8 @@ def resume_reading(self):
126147
if self._closing:
127148
return
128149
self._loop.call_soon(self._loop_reading, self._read_fut)
150+
if self._loop.get_debug():
151+
logger.debug("%r resumes reading", self)
129152

130153
def _loop_reading(self, fut=None):
131154
if self._paused:
@@ -166,6 +189,8 @@ def _loop_reading(self, fut=None):
166189
if data:
167190
self._protocol.data_received(data)
168191
elif data is not None:
192+
if self._loop.get_debug():
193+
logger.debug("%r received EOF", self)
169194
keep_open = self._protocol.eof_received()
170195
if not keep_open:
171196
self.close()
@@ -401,7 +426,9 @@ def _make_self_pipe(self):
401426
self._ssock.setblocking(False)
402427
self._csock.setblocking(False)
403428
self._internal_fds += 1
404-
self.call_soon(self._loop_self_reading)
429+
# don't check the current loop because _make_self_pipe() is called
430+
# from the event loop constructor
431+
self._call_soon(self._loop_self_reading, (), check_loop=False)
405432

406433
def _loop_self_reading(self, f=None):
407434
try:
@@ -426,6 +453,9 @@ def loop(f=None):
426453
try:
427454
if f is not None:
428455
conn, addr = f.result()
456+
if self._debug:
457+
logger.debug("%r got a new connection from %r: %r",
458+
server, addr, conn)
429459
protocol = protocol_factory()
430460
self._make_socket_transport(
431461
conn, protocol,

Lib/asyncio/selector_events.py

Lines changed: 76 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,17 @@
2323
from .log import logger
2424

2525

26+
def _test_selector_event(selector, fd, event):
27+
# Test if the selector is monitoring 'event' events
28+
# for the file descriptor 'fd'.
29+
try:
30+
key = selector.get_key(fd)
31+
except KeyError:
32+
return False
33+
else:
34+
return bool(key.events & event)
35+
36+
2637
class BaseSelectorEventLoop(base_events.BaseEventLoop):
2738
"""Selector event loop.
2839
@@ -116,6 +127,9 @@ def _accept_connection(self, protocol_factory, sock,
116127
sslcontext=None, server=None):
117128
try:
118129
conn, addr = sock.accept()
130+
if self._debug:
131+
logger.debug("%r got a new connection from %r: %r",
132+
server, addr, conn)
119133
conn.setblocking(False)
120134
except (BlockingIOError, InterruptedError, ConnectionAbortedError):
121135
pass # False alarm.
@@ -419,6 +433,26 @@ def __init__(self, loop, sock, protocol, extra, server=None):
419433
if self._server is not None:
420434
self._server._attach()
421435

436+
def __repr__(self):
437+
info = [self.__class__.__name__, 'fd=%s' % self._sock_fd]
438+
polling = _test_selector_event(self._loop._selector,
439+
self._sock_fd, selectors.EVENT_READ)
440+
if polling:
441+
info.append('read=polling')
442+
else:
443+
info.append('read=idle')
444+
445+
polling = _test_selector_event(self._loop._selector,
446+
self._sock_fd, selectors.EVENT_WRITE)
447+
if polling:
448+
state = 'polling'
449+
else:
450+
state = 'idle'
451+
452+
bufsize = self.get_write_buffer_size()
453+
info.append('write=<%s, bufsize=%s>' % (state, bufsize))
454+
return '<%s>' % ' '.join(info)
455+
422456
def abort(self):
423457
self._force_close(None)
424458

@@ -433,7 +467,10 @@ def close(self):
433467

434468
def _fatal_error(self, exc, message='Fatal error on transport'):
435469
# Should be called from exception handler only.
436-
if not isinstance(exc, (BrokenPipeError, ConnectionResetError)):
470+
if isinstance(exc, (BrokenPipeError, ConnectionResetError)):
471+
if self._loop.get_debug():
472+
logger.debug("%r: %s", self, message, exc_info=True)
473+
else:
437474
self._loop.call_exception_handler({
438475
'message': message,
439476
'exception': exc,
@@ -492,6 +529,8 @@ def pause_reading(self):
492529
raise RuntimeError('Already paused')
493530
self._paused = True
494531
self._loop.remove_reader(self._sock_fd)
532+
if self._loop.get_debug():
533+
logger.debug("%r pauses reading", self)
495534

496535
def resume_reading(self):
497536
if not self._paused:
@@ -500,6 +539,8 @@ def resume_reading(self):
500539
if self._closing:
501540
return
502541
self._loop.add_reader(self._sock_fd, self._read_ready)
542+
if self._loop.get_debug():
543+
logger.debug("%r resumes reading", self)
503544

504545
def _read_ready(self):
505546
try:
@@ -512,6 +553,8 @@ def _read_ready(self):
512553
if data:
513554
self._protocol.data_received(data)
514555
else:
556+
if self._loop.get_debug():
557+
logger.debug("%r received EOF", self)
515558
keep_open = self._protocol.eof_received()
516559
if keep_open:
517560
# We're keeping the connection open so the
@@ -638,31 +681,37 @@ def __init__(self, loop, rawsock, protocol, sslcontext, waiter=None,
638681
# SSL-specific extra info. (peercert is set later)
639682
self._extra.update(sslcontext=sslcontext)
640683

641-
self._on_handshake()
684+
if self._loop.get_debug():
685+
logger.debug("%r starts SSL handshake", self)
686+
start_time = self._loop.time()
687+
else:
688+
start_time = None
689+
self._on_handshake(start_time)
642690

643-
def _on_handshake(self):
691+
def _on_handshake(self, start_time):
644692
try:
645693
self._sock.do_handshake()
646694
except ssl.SSLWantReadError:
647-
self._loop.add_reader(self._sock_fd, self._on_handshake)
695+
self._loop.add_reader(self._sock_fd,
696+
self._on_handshake, start_time)
648697
return
649698
except ssl.SSLWantWriteError:
650-
self._loop.add_writer(self._sock_fd, self._on_handshake)
651-
return
652-
except Exception as exc:
653-
self._loop.remove_reader(self._sock_fd)
654-
self._loop.remove_writer(self._sock_fd)
655-
self._sock.close()
656-
if self._waiter is not None:
657-
self._waiter.set_exception(exc)
699+
self._loop.add_writer(self._sock_fd,
700+
self._on_handshake, start_time)
658701
return
659702
except BaseException as exc:
703+
if self._loop.get_debug():
704+
logger.warning("%r: SSL handshake failed",
705+
self, exc_info=True)
660706
self._loop.remove_reader(self._sock_fd)
661707
self._loop.remove_writer(self._sock_fd)
662708
self._sock.close()
663709
if self._waiter is not None:
664710
self._waiter.set_exception(exc)
665-
raise
711+
if isinstance(exc, Exception):
712+
return
713+
else:
714+
raise
666715

667716
self._loop.remove_reader(self._sock_fd)
668717
self._loop.remove_writer(self._sock_fd)
@@ -676,6 +725,10 @@ def _on_handshake(self):
676725
try:
677726
ssl.match_hostname(peercert, self._server_hostname)
678727
except Exception as exc:
728+
if self._loop.get_debug():
729+
logger.warning("%r: SSL handshake failed "
730+
"on matching the hostname",
731+
self, exc_info=True)
679732
self._sock.close()
680733
if self._waiter is not None:
681734
self._waiter.set_exception(exc)
@@ -696,6 +749,10 @@ def _on_handshake(self):
696749
self._loop.call_soon(self._waiter._set_result_unless_cancelled,
697750
None)
698751

752+
if self._loop.get_debug():
753+
dt = self._loop.time() - start_time
754+
logger.debug("%r: SSL handshake took %.1f ms", self, dt * 1e3)
755+
699756
def pause_reading(self):
700757
# XXX This is a bit icky, given the comment at the top of
701758
# _read_ready(). Is it possible to evoke a deadlock? I don't
@@ -709,6 +766,8 @@ def pause_reading(self):
709766
raise RuntimeError('Already paused')
710767
self._paused = True
711768
self._loop.remove_reader(self._sock_fd)
769+
if self._loop.get_debug():
770+
logger.debug("%r pauses reading", self)
712771

713772
def resume_reading(self):
714773
if not self._paused:
@@ -717,6 +776,8 @@ def resume_reading(self):
717776
if self._closing:
718777
return
719778
self._loop.add_reader(self._sock_fd, self._read_ready)
779+
if self._loop.get_debug():
780+
logger.debug("%r resumes reading", self)
720781

721782
def _read_ready(self):
722783
if self._write_wants_read:
@@ -741,6 +802,8 @@ def _read_ready(self):
741802
self._protocol.data_received(data)
742803
else:
743804
try:
805+
if self._loop.get_debug():
806+
logger.debug("%r received EOF", self)
744807
keep_open = self._protocol.eof_received()
745808
if keep_open:
746809
logger.warning('returning true from eof_received() '

0 commit comments

Comments
 (0)