1
- # type: ignore
2
1
"""
3
2
Utility functions to create server sockets able to listen on both
4
3
IPv4 and IPv6.
39
38
Tuple ,
40
39
Optional ,
41
40
Text ,
42
- List
41
+ List ,
42
+ Union ,
43
+ overload
43
44
)
44
45
46
+ from typeguard import typechecked
47
+
45
48
__author__ = "Giampaolo Rodola' <g.rodola [AT] gmail [DOT] com>"
46
49
__license__ = "MIT"
47
50
48
51
52
+ @typechecked
49
53
def has_dual_stack (sock : Optional [socket .socket ] = None ) -> bool :
50
54
"""Return True if kernel allows creating a socket which is able to
51
55
listen for both IPv4 and IPv6 connections.
@@ -64,6 +68,7 @@ def has_dual_stack(sock: Optional[socket.socket] = None) -> bool:
64
68
return False
65
69
66
70
71
+ @typechecked
67
72
def create_server_sock (
68
73
address : Tuple [Text , int ],
69
74
family : Optional [socket .AddressFamily ] = None ,
@@ -170,6 +175,7 @@ class MultipleSocketsListener:
170
175
socket in the list.
171
176
"""
172
177
178
+ @typechecked
173
179
def __init__ (
174
180
self ,
175
181
addresses : List [Tuple [Text , int ]],
@@ -179,8 +185,8 @@ def __init__(
179
185
queue_size : int = 5
180
186
) -> None :
181
187
self ._pollster : Optional [select .poll ]
182
- self ._socks = []
183
- self ._sockmap = {}
188
+ self ._socks : List [ socket . socket ] = []
189
+ self ._sockmap : Dict [ int , socket . socket ] = {}
184
190
if hasattr (select , 'poll' ):
185
191
self ._pollster = select .poll ()
186
192
else :
@@ -206,13 +212,16 @@ def __init__(
206
212
if not completed :
207
213
self .close ()
208
214
209
- def __enter__ (self ):
215
+ @typechecked
216
+ def __enter__ (self ) -> 'MultipleSocketsListener' :
210
217
return self
211
218
212
- def __exit__ (self , * args ):
219
+ @typechecked
220
+ def __exit__ (self , exc_type : Any , exc_value : Any , traceback : Any ) -> None :
213
221
self .close ()
214
222
215
- def __repr__ (self ):
223
+ @typechecked
224
+ def __repr__ (self ) -> Text :
216
225
addrs = []
217
226
for sock in self ._socks :
218
227
try :
@@ -221,72 +230,101 @@ def __repr__(self):
221
230
addrs .append (())
222
231
return '<%s (%r) at %#x>' % (self .__class__ .__name__ , addrs , id (self ))
223
232
224
- def _poll (self ):
233
+ @typechecked
234
+ def _poll (self ) -> Optional [Any ]:
225
235
"""Return the first readable fd."""
236
+ fds_select : Optional [Tuple [List [Any ], List [Any ], List [Any ]]] = None
237
+ fds_poll : Optional [List [Tuple [int , int ]]] = None
226
238
timeout = self .gettimeout ()
227
239
if self ._pollster is None :
228
- fds = select .select (self ._sockmap .keys (), [], [], timeout )
229
- if timeout and fds == ([], [], []):
230
- raise socket . timeout ('timed out' )
240
+ fds_select = select .select (self ._sockmap .keys (), [], [], timeout )
241
+ if timeout and fds_select == ([], [], []):
242
+ raise TimeoutError ('timed out' )
231
243
else :
232
244
if timeout is not None :
233
245
timeout *= 1000
234
- fds = self ._pollster .poll (timeout )
235
- if timeout and fds == []:
236
- raise socket . timeout ('timed out' )
246
+ fds_poll = self ._pollster .poll (timeout )
247
+ if timeout and fds_poll == []:
248
+ raise TimeoutError ('timed out' )
237
249
try :
238
- return fds [0 ][0 ]
250
+ if fds_select is not None :
251
+ return fds_select [0 ][0 ]
252
+ if fds_poll is not None :
253
+ return fds_poll [0 ][0 ]
239
254
except IndexError :
240
255
pass # non-blocking socket
256
+ return None
241
257
242
- def _multicall (self , name : Text , * args : Tuple [Any ], ** kwargs : Dict [Text , Any ]) -> None :
258
+ @typechecked
259
+ def _multicall (self , name : Text , * args : Any , ** kwargs : Any ) -> None :
243
260
for sock in self ._socks :
244
261
meth = getattr (sock , name )
245
262
meth (* args , ** kwargs )
246
263
247
- def accept (self ) -> None :
264
+ @typechecked
265
+ def accept (self ) -> Tuple [socket .socket , Any ]:
248
266
"""Accept a connection from the first socket which is ready
249
267
to do so.
250
268
"""
251
269
fd = self ._poll ()
252
270
sock = self ._sockmap [fd ] if fd else self ._socks [0 ]
253
271
return sock .accept ()
254
272
255
- def filenos (self ):
273
+ @typechecked
274
+ def filenos (self ) -> List [int ]:
256
275
"""Return sockets' file descriptors as a list of integers.
257
276
This is useful with select().
258
277
"""
259
278
return list (self ._sockmap .keys ())
260
279
261
280
262
- def getsockname (self ):
281
+ @typechecked
282
+ def getsockname (self ) -> Any :
263
283
"""Return first registered socket's own address."""
264
284
return self ._socks [0 ].getsockname ()
265
285
266
- def getsockopt (self , level , optname , buflen = 0 ):
286
+ @overload
287
+ def getsockopt (self , level : int , optname : int ) -> int : ...
288
+
289
+ @overload
290
+ def getsockopt (self , level : int , optname : int , buflen : int ) -> bytes : ...
291
+
292
+ @typechecked
293
+ def getsockopt (self , level : int , optname : int , buflen : int = 0 ) -> Union [int , bytes ]:
267
294
"""Return first registered socket's options."""
268
295
return self ._socks [0 ].getsockopt (level , optname , buflen )
269
296
270
- def gettimeout (self ) -> float :
297
+ @typechecked
298
+ def gettimeout (self ) -> Optional [float ]:
271
299
"""Return first registered socket's timeout."""
272
300
return self ._socks [0 ].gettimeout ()
273
301
302
+ @typechecked
274
303
def settimeout (self , timeout : float ) -> None :
275
304
"""Set timeout for all registered sockets."""
276
305
self ._multicall ('settimeout' , timeout )
277
306
278
- def setblocking (self , flag ):
307
+ @typechecked
308
+ def setblocking (self , flag : bool ) -> None :
279
309
"""Set non/blocking mode for all registered sockets."""
280
310
self ._multicall ('setblocking' , flag )
281
311
282
- def setsockopt (self , level , optname , value ):
312
+ @overload
313
+ def setsockopt (self , level : int , optname : int , value : Union [int , bytes ], optlen : None ) -> None : ...
314
+ @overload
315
+ def setsockopt (self , level : int , optname : int , value : None , optlen : int ) -> None : ...
316
+
317
+ @typechecked
318
+ def setsockopt (self , level : int , optname : int , value : Optional [Union [int , bytes ]], optlen : Optional [int ]) -> None :
283
319
"""Set option for all registered sockets."""
284
- self ._multicall ('setsockopt' , level , optname , value )
320
+ self ._multicall ('setsockopt' , level , optname , value , optlen )
285
321
286
- def shutdown (self , how ) -> None :
322
+ @typechecked
323
+ def shutdown (self , how : int ) -> None :
287
324
"""Shut down all registered sockets."""
288
325
self ._multicall ('shutdown' , how )
289
326
327
+ @typechecked
290
328
def close (self ) -> None :
291
329
"""Close all registered sockets."""
292
330
self ._multicall ('close' )
0 commit comments