@@ -180,6 +180,13 @@ def _reset(self, down):
180
180
self ._down = down
181
181
self ._count = self ._participants if down else 0
182
182
183
+ def busy (self ):
184
+ if self ._down :
185
+ done = self ._count == self ._participants
186
+ else :
187
+ done = self ._count == 0
188
+ return not done
189
+
183
190
def _at_limit (self ): # Has count reached up or down limit?
184
191
limit = 0 if self ._down else self ._participants
185
192
return self ._count == limit
@@ -223,8 +230,7 @@ def release(self):
223
230
224
231
# Task Cancellation
225
232
226
- class StopTask (Exception ):
227
- pass
233
+ StopTask = asyncio .CancelledError # More descriptive name
228
234
229
235
class TaskId ():
230
236
def __init__ (self , taskid ):
@@ -247,73 +253,6 @@ async def sleep(t, granularity=100): # 100ms default
247
253
await asyncio .sleep_ms (granularity )
248
254
await asyncio .sleep_ms (rem )
249
255
250
- # The NamedTask class enables a coro to be identified by a user defined name.
251
- # It maintains a dict of [coroutine instance, barrier] indexed by name.
252
-
253
- class NamedTask ():
254
- tasks = {}
255
- @classmethod
256
- def cancel (cls , name ):
257
- return cls .pend_throw (name , StopTask )
258
-
259
- @classmethod
260
- def pend_throw (cls , taskname , ClsException ):
261
- if taskname in cls .tasks :
262
- # pend_throw() does nothing if the task does not exist
263
- # (because it has terminated).
264
- # Enable throwing arbitrary exceptions
265
- loop = asyncio .get_event_loop ()
266
- task = cls .tasks [taskname ][0 ] # coro
267
- prev = task .pend_throw (ClsException ()) # Instantiate exception
268
- if prev is False :
269
- loop .call_soon (task )
270
- return True
271
- return False
272
-
273
- @classmethod
274
- def is_running (cls , name ):
275
- return name in cls .tasks
276
-
277
- @classmethod
278
- async def end (cls , name ): # On completion remove it
279
- if name in cls .tasks :
280
- barrier = cls .tasks [name ][1 ]
281
- if barrier is not None :
282
- await barrier (nowait = True )
283
- del cls .tasks [name ]
284
-
285
- def __init__ (self , name , gf , * args , barrier = None ):
286
- if name in self .tasks :
287
- raise ValueError ('Task name "{}" already exists.' .format (name ))
288
- task = gf (TaskId (name ), * args )
289
- self .tasks [name ] = [task , barrier ]
290
- self .task = task
291
-
292
- def __call__ (self ):
293
- return self .task
294
-
295
- def __await__ (self ):
296
- return (yield from self .task )
297
-
298
- __iter__ = __await__
299
-
300
- # @namedtask
301
-
302
- def namedtask (f ):
303
- def new_gen (* args ):
304
- g = f (* args )
305
- # Task ID is args[1] if a bound method (else args[0])
306
- idx = 0 if isinstance (args [0 ], TaskId ) else 1
307
- name = args [idx ]()
308
- try :
309
- res = await g
310
- return res
311
- except StopTask :
312
- pass
313
- finally :
314
- await NamedTask .end (name )
315
- return new_gen
316
-
317
256
# Anonymous cancellable tasks. These are members of a group which is identified
318
257
# by a user supplied name/number (default 0). Class method cancel_all() cancels
319
258
# all tasks in a group and awaits confirmation (signalled by a task awaiting
@@ -322,27 +261,36 @@ def new_gen(*args):
322
261
323
262
class Cancellable ():
324
263
task_no = 0 # Generated task ID, index of tasks dict
325
- tasks = {} # Value is [coro, group]
326
- barrier = None # Instantiated by the cancel_all method
264
+ tasks = {} # Value is [coro, group, barrier] indexed by task_id
327
265
328
266
@classmethod
329
267
def _cancel (cls , task_no ):
330
- # pend_throw() does nothing if the task does not exist
331
- # (because it has terminated).
332
268
task = cls .tasks [task_no ][0 ]
333
- prev = task .pend_throw (StopTask ()) # Instantiate exception
334
- if prev is False :
335
- loop = asyncio .get_event_loop ()
336
- loop .call_soon (task )
269
+ asyncio .cancel (task )
337
270
338
271
@classmethod
339
- async def cancel_all (cls , group = 0 ):
340
- tokill = [ task_no for task_no in cls .tasks if cls . tasks [ task_no ][ 1 ] == group ]
341
- cls . barrier = Barrier (len (tokill ) + 1 ) # Include this task
272
+ async def cancel_all (cls , group = 0 , nowait = False ):
273
+ tokill = cls ._get_task_nos ( group )
274
+ barrier = Barrier (len (tokill ) + 1 ) # Include this task
342
275
for task_no in tokill :
276
+ cls .tasks [task_no ][2 ] = barrier
343
277
cls ._cancel (task_no )
344
- await cls .barrier
345
- cls .barrier = None # Make available for GC
278
+ await barrier (nowait = nowait )
279
+
280
+ @classmethod
281
+ def is_running (cls , group = 0 ):
282
+ tasks = cls ._get_task_nos (group )
283
+ if tasks is []:
284
+ return False
285
+ for task_no in tasks :
286
+ barrier = Cancellable .tasks [task_no ][2 ]
287
+ if barrier .busy ():
288
+ return True
289
+ return False
290
+
291
+ @classmethod
292
+ def _get_task_nos (cls , group ):
293
+ return [task_no for task_no in cls .tasks if cls .tasks [task_no ][1 ] == group ]
346
294
347
295
@classmethod
348
296
def end (cls , task_no ):
@@ -352,14 +300,15 @@ def end(cls, task_no):
352
300
@classmethod
353
301
async def stopped (cls , task_no ):
354
302
if task_no in cls .tasks :
303
+ barrier = Cancellable .tasks [task_no ][2 ]
304
+ await barrier (nowait = True )
355
305
del cls .tasks [task_no ]
356
- await Cancellable .barrier (nowait = True )
357
306
358
307
def __init__ (self , gf , * args , group = 0 ):
359
308
task = gf (TaskId (Cancellable .task_no ), * args )
360
309
if task in self .tasks :
361
310
raise ValueError ('Task already exists.' )
362
- self .tasks [Cancellable .task_no ] = [task , group ]
311
+ self .tasks [Cancellable .task_no ] = [task , group , None ]
363
312
Cancellable .task_no += 1
364
313
self .task = task
365
314
@@ -388,48 +337,59 @@ def new_gen(*args):
388
337
Cancellable .end (task_no )
389
338
return new_gen
390
339
340
+ # The NamedTask class enables a coro to be identified by a user defined name.
341
+ # It maintains a dict of [coroutine instance, barrier] indexed by name.
342
+
343
+ class NamedTask ():
344
+ tasks = {}
345
+ @classmethod
346
+ def cancel (cls , name ):
347
+ if name in cls .tasks :
348
+ task = cls .tasks [name ][0 ] # coro
349
+ asyncio .cancel (task )
350
+ return True
351
+ return False
352
+
353
+ @classmethod
354
+ def is_running (cls , name ):
355
+ return name in cls .tasks
391
356
392
- # ExitGate is ***obsolete*** since uasyncio now supports task cancellation.
357
+ @classmethod
358
+ async def end (cls , name ): # On completion remove it
359
+ if name in cls .tasks :
360
+ barrier = cls .tasks [name ][1 ]
361
+ if barrier is not None :
362
+ await barrier (nowait = True )
363
+ del cls .tasks [name ]
393
364
394
- class ExitGate ():
395
- def __init__ (self , granularity = 100 ):
396
- self ._ntasks = 0
397
- self ._ending = False
398
- self ._granularity = granularity
365
+ def __init__ (self , name , gf , * args , barrier = None ):
366
+ if name in self .tasks :
367
+ raise ValueError ('Task name "{}" already exists.' .format (name ))
368
+ task = gf (TaskId (name ), * args )
369
+ self .tasks [name ] = [task , barrier ]
370
+ self .task = task
399
371
400
- def ending (self ):
401
- return self ._ending
372
+ def __call__ (self ):
373
+ return self .task
402
374
403
375
def __await__ (self ):
404
- self ._ending = True
405
- while self ._ntasks :
406
- yield from asyncio .sleep_ms (self ._granularity )
407
- self ._ending = False # May want to re-use
376
+ return (yield from self .task )
408
377
409
378
__iter__ = __await__
410
379
411
- async def __aenter__ (self ):
412
- self ._ntasks += 1
413
- await asyncio .sleep_ms (0 )
380
+ # @namedtask
414
381
415
- async def __aexit__ (self , * args ):
416
- self ._ntasks -= 1
417
- await asyncio .sleep_ms (0 )
418
-
419
- # Sleep while checking for premature ending. Return True on normal ending,
420
- # False if premature.
421
- async def sleep (self , t ):
422
- t *= 1000 # ms
423
- granularity = self ._granularity
424
- if t <= granularity :
425
- await asyncio .sleep_ms (t )
426
- else :
427
- n , rem = divmod (t , granularity )
428
- for _ in range (n ):
429
- if self ._ending :
430
- return False
431
- await asyncio .sleep_ms (granularity )
432
- if self ._ending :
433
- return False
434
- await asyncio .sleep_ms (rem )
435
- return not self ._ending
382
+ def namedtask (f ):
383
+ def new_gen (* args ):
384
+ g = f (* args )
385
+ # Task ID is args[1] if a bound method (else args[0])
386
+ idx = 0 if isinstance (args [0 ], TaskId ) else 1
387
+ name = args [idx ]()
388
+ try :
389
+ res = await g
390
+ return res
391
+ except StopTask :
392
+ pass
393
+ finally :
394
+ await NamedTask .end (name )
395
+ return new_gen
0 commit comments