Skip to content

Commit 68f8ec1

Browse files
committed
v3/primitives/queue.py Bugfix - asyntest.py tests Queue more thoroughly.
1 parent d76d075 commit 68f8ec1

File tree

2 files changed

+72
-14
lines changed

2 files changed

+72
-14
lines changed

v3/primitives/queue.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,14 @@ def __init__(self, maxsize=0):
2727
self._evget = asyncio.Event() # Triggered by get, tested by put
2828

2929
def _get(self):
30-
self._evget.set()
30+
self._evget.set() # Schedule all tasks waiting on get
31+
self._evget.clear()
3132
return self._queue.pop(0)
3233

3334
async def get(self): # Usage: item = await queue.get()
3435
while self.empty(): # May be multiple tasks waiting on get()
3536
# Queue is empty, suspend task until a put occurs
3637
# 1st of N tasks gets, the rest loop again
37-
self._evput.clear()
3838
await self._evput.wait()
3939
return self._get()
4040

@@ -45,13 +45,13 @@ def get_nowait(self): # Remove and return an item from the queue.
4545
return self._get()
4646

4747
def _put(self, val):
48-
self._evput.set()
48+
self._evput.set() # Schedule tasks waiting on put
49+
self._evput.clear()
4950
self._queue.append(val)
5051

5152
async def put(self, val): # Usage: await queue.put(item)
5253
while self.full():
5354
# Queue full
54-
self._evget.clear()
5555
await self._evget.wait()
5656
# Task(s) waiting to get from queue, schedule first Task
5757
self._put(val)

v3/primitives/tests/asyntest.py

Lines changed: 68 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -374,39 +374,97 @@ def condition_test():
374374
# ************ Queue test ************
375375

376376
from primitives.queue import Queue
377-
q = Queue()
378377

379378
async def slow_process():
380379
await asyncio.sleep(2)
381380
return 42
382381

383-
async def bar():
382+
async def bar(q):
384383
print('Waiting for slow process.')
385384
result = await slow_process()
386385
print('Putting result onto queue')
387386
await q.put(result) # Put result on q
388387

389-
async def foo():
388+
async def foo(q):
390389
print("Running foo()")
391390
result = await q.get()
392391
print('Result was {}'.format(result))
393392

394-
async def queue_go(delay):
395-
asyncio.create_task(foo())
396-
asyncio.create_task(bar())
397-
await asyncio.sleep(delay)
393+
async def q_put(n, q):
394+
for x in range(8):
395+
obj = (n, x)
396+
await q.put(obj)
397+
await asyncio.sleep(0)
398+
399+
async def q_get(n, q):
400+
for x in range(8):
401+
await q.get()
402+
await asyncio.sleep(0)
403+
404+
async def putter(q):
405+
# put some item, then sleep
406+
for _ in range(20):
407+
await q.put(1)
408+
await asyncio.sleep_ms(50)
409+
410+
411+
async def getter(q):
412+
# checks for new items, and relies on the "blocking" of the get method
413+
for _ in range(20):
414+
await q.get()
415+
416+
async def queue_go():
417+
q = Queue(10)
418+
asyncio.create_task(foo(q))
419+
asyncio.create_task(bar(q))
420+
await asyncio.sleep(3)
421+
for n in range(4):
422+
asyncio.create_task(q_put(n, q))
423+
await asyncio.sleep(1)
424+
assert q.qsize() == 10
425+
await q.get()
426+
await asyncio.sleep(0.1)
427+
assert q.qsize() == 10
428+
while not q.empty():
429+
await q.get()
430+
await asyncio.sleep(0.1)
431+
assert q.empty()
432+
print('Competing put tasks test complete')
433+
434+
for n in range(4):
435+
asyncio.create_task(q_get(n, q))
436+
await asyncio.sleep(1)
437+
x = 0
438+
while not q.full():
439+
await q.put(x)
440+
await asyncio.sleep(0.3)
441+
x += 1
442+
assert q.qsize() == 10
443+
print('Competing get tasks test complete')
444+
await asyncio.gather(
445+
putter(q),
446+
getter(q)
447+
)
448+
print('Queue tests complete')
398449
print("I've seen starships burn off the shoulder of Orion...")
399450
print("Time to die...")
400451

401452
def queue_test():
402-
printexp('''Running (runtime = 3s):
453+
printexp('''Running (runtime = 20s):
403454
Running foo()
404455
Waiting for slow process.
405456
Putting result onto queue
457+
Result was 42
458+
Competing put tasks test complete
459+
Competing get tasks test complete
460+
Queue tests complete
461+
462+
406463
I've seen starships burn off the shoulder of Orion...
407464
Time to die...
408-
''', 3)
409-
asyncio.run(queue_go(3))
465+
466+
''', 20)
467+
asyncio.run(queue_go())
410468

411469
def test(n):
412470
try:

0 commit comments

Comments
 (0)