Skip to content

Commit 78d22a4

Browse files
committed
v3 Simplify Barrier class. Improve test and tutorial.
1 parent 68f8ec1 commit 78d22a4

File tree

3 files changed

+114
-112
lines changed

3 files changed

+114
-112
lines changed

v3/docs/TUTORIAL.md

Lines changed: 76 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -650,49 +650,37 @@ Synchronous Methods:
650650
Asynchronous Method:
651651
* `wait` Pause until event is set.
652652

653-
Coros waiting on the event issue `await event.wait()` when execution pauses until
654-
another issues `event.set()`.
655-
656-
This presents a problem if `event.set()` is issued in a looping construct; the
657-
code must wait until the event has been accessed by all waiting tasks before
658-
setting it again. In the case where a single task is awaiting the event this
659-
can be achieved by the receiving task clearing the event:
660-
661-
```python
662-
async def eventwait(event):
663-
await event.wait()
664-
# Process the data
665-
event.clear() # Tell the caller it's ready for more
666-
```
667-
668-
The task raising the event checks that it has been serviced:
669-
670-
```python
671-
async def foo(event):
672-
while True:
673-
# Acquire data from somewhere
674-
while event.is_set():
675-
await asyncio.sleep(1) # Wait for task to be ready
676-
# Data is available to the task, so alert it:
677-
event.set()
678-
```
679-
680-
Where multiple tasks wait on a single event synchronisation can be achieved by
681-
means of an acknowledge event. Each task needs a separate event.
682-
683-
```python
684-
async def eventwait(event, ack_event):
685-
await event.wait()
686-
ack_event.set()
687-
```
688-
689-
This is cumbersome. In most cases - even those with a single waiting task - the
690-
Barrier class offers a simpler approach.
653+
Tasks wait on the event by issuing `await event.wait()`; execution pauses until
654+
another issues `event.set()`. This causes all tasks waiting on the `Event` to
655+
be queued for execution. Note that the synchronous sequence
656+
```python
657+
event.set()
658+
event.clear()
659+
```
660+
will cause waiting task(s) to resume in round-robin order.
661+
662+
The `Event` class is an efficient and effective way to synchronise tasks, but
663+
firmware applications often have multiple tasks running `while True:` loops.
664+
The number of `Event` instances required to synchronise these can multiply.
665+
Consider the case of one producer task feeding N consumers. The producer sets
666+
an `Event` to tell the consumer that data is ready; it then needs to wait until
667+
all consumers have completed before triggering them again. Consider these
668+
approaches:
669+
1. Each consumer sets an `Event` on completion. Producer waits until all
670+
`Event`s are set before clearing them and setting its own `Event`.
671+
2. Consumers do not loop, running to completion. Producer uses `gather` to
672+
instantiate consumer tasks and wait on their completion.
673+
3. `Event`s are replaced with a single [Barrier](./TUTORIAL.md#37-barrier)
674+
instance.
675+
676+
Solution 1 suffers a proliferation of `Event`s and suffers an inefficient
677+
busy-wait where the producer waits on N events. Solution 2 is inefficient with
678+
constant creation of tasks. Arguably the `Barrier` class is the best approach.
691679

692680
**NOTE NOT YET SUPPORTED - see Message class**
693681
An Event can also provide a means of communication between an interrupt handler
694682
and a task. The handler services the hardware and sets an event which is tested
695-
in slow time by the task.
683+
in slow time by the task. See [PR6106](https://github.com/micropython/micropython/pull/6106).
696684

697685
###### [Contents](./TUTORIAL.md#contents)
698686

@@ -888,9 +876,9 @@ asyncio.run(queue_go(4))
888876

889877
## 3.6 Message
890878

891-
This is an unofficial primitive and has no counterpart in CPython asyncio.
879+
This is an unofficial primitive with no counterpart in CPython asyncio.
892880

893-
This is a minor adaptation of the `Event` class. It provides the following:
881+
This is similar to the `Event` class. It provides the following:
894882
* `.set()` has an optional data payload.
895883
* `.set()` is capable of being called from a hard or soft interrupt service
896884
routine - a feature not yet available in the more efficient official `Event`.
@@ -927,11 +915,14 @@ async def main():
927915

928916
asyncio.run(main())
929917
```
930-
931918
A `Message` can provide a means of communication between an interrupt handler
932919
and a task. The handler services the hardware and issues `.set()` which is
933920
tested in slow time by the task.
934921

922+
Currently its behaviour differs from that of `Event` where multiple tasks wait
923+
on a `Message`. This may change: it is therefore recommended to use `Message`
924+
instances with only one receiving task.
925+
935926
###### [Contents](./TUTORIAL.md#contents)
936927

937928
## 3.7 Barrier
@@ -964,6 +955,47 @@ would imply instantiating a set of tasks on every pass of the loop.
964955
passing a barrier does not imply return. `Barrier` now has an efficient
965956
implementation using `Event` to suspend waiting tasks.
966957

958+
The following is a typical usage example. A data provider acquires data from
959+
some hardware and transmits it concurrently on a number of interefaces. These
960+
run at different speeds. The `Barrier` synchronises these loops. This can run
961+
on a Pyboard.
962+
```python
963+
import uasyncio as asyncio
964+
from primitives.barrier import Barrier
965+
from machine import UART
966+
import ujson
967+
968+
data = None
969+
async def provider(barrier):
970+
global data
971+
n = 0
972+
while True:
973+
n += 1 # Get data from some source
974+
data = ujson.dumps([n, 'the quick brown fox jumps over the lazy dog'])
975+
print('Provider triggers senders')
976+
await barrier # Free sender tasks
977+
print('Provider waits for last sender to complete')
978+
await barrier
979+
980+
async def sender(barrier, swriter, n):
981+
while True:
982+
await barrier # Provider has got data
983+
swriter.write(data)
984+
await swriter.drain()
985+
print('UART', n, 'sent', data)
986+
await barrier # Trigger provider when last sender has completed
987+
988+
async def main():
989+
sw1 = asyncio.StreamWriter(UART(1, 9600), {})
990+
sw2 = asyncio.StreamWriter(UART(2, 1200), {})
991+
barrier = Barrier(3)
992+
for n, sw in enumerate((sw1, sw2)):
993+
asyncio.create_task(sender(barrier, sw, n + 1))
994+
await provider(barrier)
995+
996+
asyncio.run(main())
997+
```
998+
967999
Constructor.
9681000
Mandatory arg:
9691001
* `participants` The number of coros which will use the barrier.
@@ -972,8 +1004,8 @@ Optional args:
9721004
* `args` Tuple of args for the callback. Default `()`.
9731005

9741006
Public synchronous methods:
975-
* `busy` No args. Returns `True` if at least one coro is waiting on the
976-
barrier, or if at least one non-waiting coro has not triggered it.
1007+
* `busy` No args. Returns `True` if at least one task is waiting on the
1008+
barrier.
9771009
* `trigger` No args. The barrier records that the coro has passed the critical
9781010
point. Returns "immediately".
9791011
* `result` No args. If a callback was provided, returns the return value from
@@ -1000,36 +1032,6 @@ passed the barrier, and all waiting coros have reached it. At that point all
10001032
waiting coros will resume. A non-waiting coro issues `barrier.trigger()` to
10011033
indicate that is has passed the critical point.
10021034

1003-
```python
1004-
import uasyncio as asyncio
1005-
from uasyncio import Event
1006-
from primitives.barrier import Barrier
1007-
1008-
def callback(text):
1009-
print(text)
1010-
1011-
async def report(num, barrier, event):
1012-
for i in range(5):
1013-
# De-synchronise for demo
1014-
await asyncio.sleep_ms(num * 50)
1015-
print('{} '.format(i), end='')
1016-
await barrier
1017-
event.set()
1018-
1019-
async def main():
1020-
barrier = Barrier(3, callback, ('Synch',))
1021-
event = Event()
1022-
for num in range(3):
1023-
asyncio.create_task(report(num, barrier, event))
1024-
await event.wait()
1025-
1026-
asyncio.run(main())
1027-
```
1028-
1029-
multiple instances of `report` print their result and pause until the other
1030-
instances are also complete and waiting on `barrier`. At that point the
1031-
callback runs. On its completion the tasks resume.
1032-
10331035
###### [Contents](./TUTORIAL.md#contents)
10341036

10351037
## 3.8 Delay_ms class

v3/primitives/barrier.py

Lines changed: 15 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -19,51 +19,35 @@
1919
class Barrier():
2020
def __init__(self, participants, func=None, args=()):
2121
self._participants = participants
22+
self._count = participants
2223
self._func = func
2324
self._args = args
24-
self._reset(True)
2525
self._res = None
2626
self._evt = asyncio.Event()
2727

2828
def __await__(self):
2929
if self.trigger():
30-
return
31-
32-
direction = self._down
33-
while True: # Wait until last waiting task changes the direction
34-
if direction != self._down:
35-
return
36-
await self._evt.wait()
37-
self._evt.clear()
30+
return # Other tasks have already reached barrier
31+
await self._evt.wait() # Wait until last task reaches it
3832

3933
__iter__ = __await__
4034

4135
def result(self):
4236
return self._res
4337

4438
def trigger(self):
45-
self._count += -1 if self._down else 1
46-
if self._count < 0 or self._count > self._participants:
39+
self._count -=1
40+
if self._count < 0:
4741
raise ValueError('Too many tasks accessing Barrier')
48-
self._evt.set()
49-
if self._at_limit(): # All other tasks are also at limit
50-
if self._func is not None:
51-
self._res = launch(self._func, self._args)
52-
self._reset(not self._down) # Toggle direction to release others
53-
return True
54-
return False
55-
56-
def _reset(self, down):
57-
self._down = down
58-
self._count = self._participants if down else 0
42+
if self._count > 0:
43+
return False # At least 1 other task has not reached barrier
44+
# All other tasks are waiting
45+
if self._func is not None:
46+
self._res = launch(self._func, self._args)
47+
self._count = self._participants
48+
self._evt.set() # Release others
49+
self._evt.clear()
50+
return True
5951

6052
def busy(self):
61-
if self._down:
62-
done = self._count == self._participants
63-
else:
64-
done = self._count == 0
65-
return not done
66-
67-
def _at_limit(self): # Has count reached up or down limit?
68-
limit = 0 if self._down else self._participants
69-
return self._count == limit
53+
return self._count < self._participants

v3/primitives/tests/asyntest.py

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ def print_tests():
2323
st = '''Available functions:
2424
test(0) Print this list.
2525
test(1) Test message acknowledge.
26-
test(2) Test Messge and Lock objects.
26+
test(2) Test Message and Lock objects.
2727
test(3) Test the Barrier class with callback.
2828
test(4) Test the Barrier class with coroutine.
2929
test(5) Test Semaphore
@@ -175,17 +175,29 @@ async def report(barrier):
175175
print('{} '.format(i), end='')
176176
await barrier
177177

178+
async def do_barrier_test():
179+
barrier = Barrier(3, callback, ('Synch',))
180+
for _ in range(2):
181+
for _ in range(3):
182+
asyncio.create_task(report(barrier))
183+
await asyncio.sleep(1)
184+
print()
185+
await asyncio.sleep(1)
186+
178187
def barrier_test():
179-
printexp('''0 0 0 Synch
188+
printexp('''Running (runtime = 3s):
189+
0 0 0 Synch
180190
1 1 1 Synch
181191
2 2 2 Synch
182192
3 3 3 Synch
183193
4 4 4 Synch
184-
''')
185-
barrier = Barrier(3, callback, ('Synch',))
186-
for _ in range(3):
187-
asyncio.create_task(report(barrier))
188-
asyncio.run(killer(2))
194+
195+
1 1 1 Synch
196+
2 2 2 Synch
197+
3 3 3 Synch
198+
4 4 4 Synch
199+
''', 3)
200+
asyncio.run(do_barrier_test())
189201

190202
# ************ Barrier test 1 ************
191203

@@ -208,7 +220,11 @@ async def bart():
208220
barrier = Barrier(4, my_coro, ('my_coro running',))
209221
for x in range(3):
210222
asyncio.create_task(report1(barrier, x))
223+
await asyncio.sleep(4)
224+
assert barrier.busy()
211225
await barrier
226+
await asyncio.sleep(0)
227+
assert not barrier.busy()
212228
# Must yield before reading result(). Here we wait long enough for
213229
await asyncio.sleep_ms(1500) # coro to print
214230
barrier.result().cancel()

0 commit comments

Comments
 (0)