From 65b2f75fa30897989d9591b15354a1670209a7f5 Mon Sep 17 00:00:00 2001 From: Michal Moravec Date: Tue, 18 Jun 2024 13:14:36 +0200 Subject: [PATCH 01/28] Update TUTORIAL.md Fixed link to demo iorw.py --- v3/docs/TUTORIAL.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/v3/docs/TUTORIAL.md b/v3/docs/TUTORIAL.md index 68bd663..5a050fd 100644 --- a/v3/docs/TUTORIAL.md +++ b/v3/docs/TUTORIAL.md @@ -2257,7 +2257,7 @@ class PinCall(io.IOBase): Once again latency can be high: if implemented fast I/O scheduling will improve this. -The demo program [iorw.py](./as_demos/iorw.py) illustrates a complete example. +The demo program [iorw.py](../as_demos/iorw.py) illustrates a complete example. ###### [Contents](./TUTORIAL.md#contents) From af12a3caa7a4ba4059c7e73d940f24a8fef24124 Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Tue, 13 Aug 2024 17:40:10 +0100 Subject: [PATCH 02/28] v3/README.md: Fix author notes. --- v3/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/v3/README.md b/v3/README.md index d2bd04f..95d7344 100644 --- a/v3/README.md +++ b/v3/README.md @@ -32,11 +32,11 @@ deal with blocking functions. This official tool enables an application to launch a REPL which is active while the application is running. From this you can modify and query the application and run `asyncio` scripts concurrently with the running -application. +application. Author Jim Mussared @jimmo. [aioprof](https://gitlab.com/alelec/aioprof/-/tree/main) A profiler for `asyncio` applications: show the number of calls and the total time used by -each task. Author Matt Trentini. +each task. Author Andrew Leech @andrewleech. [monitor](https://github.com/peterhinch/micropython-monitor) enables a running `asyncio` application to be monitored using a Pi Pico, ideally with a scope or From b30573992dcb35e457e0c25b84410a07b21841bc Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Wed, 28 Aug 2024 11:22:56 +0100 Subject: [PATCH 03/28] Tutorial: Correct error in section 7.6 re select.poll. --- v3/docs/TUTORIAL.md | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/v3/docs/TUTORIAL.md b/v3/docs/TUTORIAL.md index 5a050fd..d89b86c 100644 --- a/v3/docs/TUTORIAL.md +++ b/v3/docs/TUTORIAL.md @@ -2420,10 +2420,8 @@ There are two basic approaches to socket programming under `asyncio`. By default sockets block until a specified read or write operation completes. `asyncio` supports blocking sockets by using `select.poll` to prevent them from blocking the scheduler. In most cases it is simplest to use this -mechanism. Example client and server code may be found in the `client_server` -directory. The `userver` application uses `select.poll` explicitly to poll -the server socket. The client sockets use it implicitly in that the `asyncio` -stream mechanism employs it. +mechanism. Note that the `asyncio` stream mechanism employs it. Example client +and server code may be found in the `client_server` directory. Note that `socket.getaddrinfo` currently blocks. The time will be minimal in the example code but if a DNS lookup is required the blocking period could be From 9b6f58d883f45e9ae97c70b122d206f69c7d43fe Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Thu, 5 Sep 2024 13:29:28 +0100 Subject: [PATCH 04/28] SCHEDULE.md: Add note re DST under Unix build. --- v3/docs/SCHEDULE.md | 62 ++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 59 insertions(+), 3 deletions(-) diff --git a/v3/docs/SCHEDULE.md b/v3/docs/SCHEDULE.md index ea63cfe..527b73d 100644 --- a/v3/docs/SCHEDULE.md +++ b/v3/docs/SCHEDULE.md @@ -19,13 +19,14 @@ 7. [Use in synchronous code](./SCHEDULE.md#7-use-in-synchronous-code) If you really must. 7.1 [Initialisation](./SCHEDULE.md#71-initialisation)__ 8. [The simulate script](./SCHEDULE.md#8-the-simulate-script) Rapidly test sequences. + 9. [Design note](./SCHEDULE.md#9-design-note) Notes on use under an OS. Release note: 11th Dec 2023 Document astronomy module, allowing scheduling based on Sun and -Moon rise and set times. -23rd Nov 2023 Add asynchronous iterator interface. +Moon rise and set times. +23rd Nov 2023 Add asynchronous iterator interface. 3rd April 2023 Fix issue #100. Where an iterable is passed to `secs`, triggers -must now be at least 10s apart (formerly 2s). +must now be at least 10s apart (formerly 2s). ##### [Tutorial](./TUTORIAL.md#contents) ##### [Main V3 README](../README.md) @@ -285,6 +286,9 @@ and duplicates when they go back. Scheduling those times will fail. A solution is to avoid scheduling the times in your region where this occurs (01.00.00 to 02.00.00 in March and October here). +It is believed that in other respects DST is handled correctly by the OS: see +[Design note](./SCHEDULE.md#9-design-note). + ##### [Top](./SCHEDULE.md#0-contents) ## 4.5 Callback interface @@ -546,3 +550,55 @@ the value of system time when the delay ends. In this instance the start of a sequence is delayed to ensure that the first trigger occurs at 01:00. ##### [Top](./SCHEDULE.md#0-contents) + +# 9. Design note + +This module is primarily intended for use on a microcontroller, where the time +source is a hardware RTC. This is usually set to local time and does not change +for daylight saving time (DST). Changing the system time while running `asyncio` +code is not recommended. + +A [question was raised](https://github.com/peterhinch/micropython-async/pull/126) +regarding the behaviour of the module when running under the Unix build - in +particular whether the module's use of `time.localtime` is correct, because +`.localtime` changes when DST is invoked. To test whether a problem exists, an +attempt was made to write a script whose behaviour under Unix differed from that +on a microcontroller. The latter has no concept of DST or timezone (TZ) so can +be assumed to be free of such bugs. Unless such a reproducer can be found, it +seems that usage under the Unix build should be correct. + +The following test script outputs the time in seconds between two fixed times +separated by two months, the period being chosen to cross a DST boundary here in +the UK. It passed under the following conditions: + +* On a Pyboard. +* On an ESP32. +* On Unix MicroPython. +* On CPython. +* On the Unix build with my laptop's location set to California. Reported time +changed by -7hrs. +* On CPython in California. + +The conclusion is that the OS ensures that DST related errors do not occur. + +```py +from time import mktime, gmtime, localtime +from sys import implementation +cpython = implementation.name == 'cpython' +if cpython: + from time import struct_time + +start = [2024, 9, 5, 11, 49, 2, 3, 249, 1] +sept = round(mktime(struct_time(start))) if cpython else mktime(start) + +end = start[:] +end[1] += 2 # Same time + 2months Crosses DST boundary in the UK + +november = round(mktime(struct_time(end))) if cpython else mktime(end) +print(november - sept) + +if november - sept == 5270400: + print('PASS') +else: + print('FAIL') +``` From c79fb0057e60aef1f0ef1f14e4d745c520dc2337 Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Mon, 9 Sep 2024 10:35:25 +0100 Subject: [PATCH 05/28] SCHEDULE.md: Correct notes on DST under Unix build. --- v3/docs/SCHEDULE.md | 115 ++++++++++++++++++++++++++++---------------- 1 file changed, 73 insertions(+), 42 deletions(-) diff --git a/v3/docs/SCHEDULE.md b/v3/docs/SCHEDULE.md index 527b73d..38bf0a6 100644 --- a/v3/docs/SCHEDULE.md +++ b/v3/docs/SCHEDULE.md @@ -19,9 +19,10 @@ 7. [Use in synchronous code](./SCHEDULE.md#7-use-in-synchronous-code) If you really must. 7.1 [Initialisation](./SCHEDULE.md#71-initialisation)__ 8. [The simulate script](./SCHEDULE.md#8-the-simulate-script) Rapidly test sequences. - 9. [Design note](./SCHEDULE.md#9-design-note) Notes on use under an OS. + 9. [Daylight saving time](./SCHEDULE.md#9-daylight-saving-time) Notes on timezone and DST when running under an OS. -Release note: +Release note: +7th Sep 2024 Document timezone and DST behaviour under Unix build. 11th Dec 2023 Document astronomy module, allowing scheduling based on Sun and Moon rise and set times. 23rd Nov 2023 Add asynchronous iterator interface. @@ -271,23 +272,24 @@ A `cron` call typically takes 270 to 520μs on a Pyboard, but the upper bound depends on the complexity of the time specifiers. On hardware platforms the MicroPython `time` module does not handle daylight -saving time. Scheduled times are relative to system time. This does not apply -to the Unix build where daylight saving needs to be considered. +saving time. Scheduled times are relative to system time. Under the Unix build, +where the locale uses daylight saving, its effects should be considered. See +[Daylight saving time](./SCHEDULE.md#9-daylight-saving-time). ## 4.4 The Unix build Asynchronous use requires `asyncio` V3, so ensure this is installed on the -Linux target. - -The synchronous and asynchronous demos run under the Unix build. The module is -usable on Linux provided the daylight saving time (DST) constraints are met. A -consequence of DST is that there are impossible times when clocks go forward -and duplicates when they go back. Scheduling those times will fail. A solution -is to avoid scheduling the times in your region where this occurs (01.00.00 to -02.00.00 in March and October here). +Linux target. This may be checked with: +```py +import asyncio +asyncio.__version__ +``` +The module uses local time. When running under an OS, local time is affected by +geographical longitude (timezone - TZ) and daylight saving time (DST). The use +of local time avoids TZ issues but has consequences when the underlying time +source changes due to crossing a DST boundary. -It is believed that in other respects DST is handled correctly by the OS: see -[Design note](./SCHEDULE.md#9-design-note). +This is explained in detail in [Daylight saving time](./SCHEDULE.md#9-daylight-saving-time). ##### [Top](./SCHEDULE.md#0-contents) @@ -551,36 +553,53 @@ sequence is delayed to ensure that the first trigger occurs at 01:00. ##### [Top](./SCHEDULE.md#0-contents) -# 9. Design note +# 9. Daylight saving time -This module is primarily intended for use on a microcontroller, where the time -source is a hardware RTC. This is usually set to local time and does not change -for daylight saving time (DST). Changing the system time while running `asyncio` -code is not recommended. - -A [question was raised](https://github.com/peterhinch/micropython-async/pull/126) -regarding the behaviour of the module when running under the Unix build - in -particular whether the module's use of `time.localtime` is correct, because -`.localtime` changes when DST is invoked. To test whether a problem exists, an -attempt was made to write a script whose behaviour under Unix differed from that -on a microcontroller. The latter has no concept of DST or timezone (TZ) so can -be assumed to be free of such bugs. Unless such a reproducer can be found, it -seems that usage under the Unix build should be correct. - -The following test script outputs the time in seconds between two fixed times -separated by two months, the period being chosen to cross a DST boundary here in -the UK. It passed under the following conditions: - -* On a Pyboard. -* On an ESP32. -* On Unix MicroPython. -* On CPython. -* On the Unix build with my laptop's location set to California. Reported time -changed by -7hrs. -* On CPython in California. - -The conclusion is that the OS ensures that DST related errors do not occur. +Thanks are due to @rhermanklink for raising this issue. +This module is primarily intended for use on a microcontroller, where the time +source is a hardware RTC. This is usually set to local time, and must not change +for daylight saving time (DST); on a microcontroller neither this module nor +`asyncio` will work correctly if system time is changed at runtime. Under an OS, +some kind of thaumaturgy enables `asyncio` to tolerate this behaviour. + +Internally the module uses local time (`time.time()` and `time.localtime()`) to +retrieve the current time. Under an OS, in a locale where DST is used, the time +returned by these methods does not increase monotonically but is subject to +sudden changes at a DST boundary. + +A `cron` instance accepts "time now" measured in seconds from the epoch, and +returns the time to wait for the first scheduled event. This wait time is +calculated on the basis of a monotonic local time. Assume that the time is +10:00:00 on 1st August, and the first scheduled event is at 10:00:00 on 1st +November. The `cron` instance will return the time to wait. The application task +waits for that period, but local clocks will have changed so that the time reads +9:00:00. + +The primary application for this module is on microcontrollers. Further, there +are alternatives such as [Python schedule](https://github.com/dbader/schedule) +which are designed to run under an OS. Fixing this would require a timezone +solution; in many cases the application can correct for DST. Consequently this +behaviour has been deemed to be in the "document, don't fix" category. + +The following notes are general observations which may be of interest. + +### The epoch + +The Python `time.time()` method returns the number of seconds since the epoch. +This is computed relative to the system clock; consecutive calls around a DST +change will yield a sudden change (+3600 secs for a +one hour change). +This value may be converted to a time tuple with `time.gmtime(secs)` or with +`time.localtime(secs)`. If UTC and local time differ, for the same value of +`secs` these will produce UTC-relative and localtime-relative tuples. + +Consider `time.mktime()`. This converts a time tuple to a number of seconds +since the epoch. The time difference between a specified time and the epoch is +independent of timezone and DST. The specified time and the epoch are assumed to +be defined in the same (unknown, unspecified) time system. Consequently, if a +delay is defined by the difference between two `mktime()` values, that delay +will be unaffected if a DST change occurs between those two values. This may be +verified with the following script: ```py from time import mktime, gmtime, localtime from sys import implementation @@ -602,3 +621,15 @@ if november - sept == 5270400: else: print('FAIL') ``` +This test passes on the Unix build, under CPython, and on MicroPython on a +microcontroller. It also passes under an OS if the system's local time differs +substantially from UTC. + +The `cron` module returns a time difference between a passed time value and one +produced by `mktime()`: accordingly `cron` takes no account of local time or +DST. If local time is changed while waiting for the period specified by `cron`, +at the end of the delay, clocks measuring local time will indicate an incorrect +time. + +This is only an issue when running under an OS: if it is considered an error, it +should be addressed in application code. From f0054be153a158c69a85273570b66aac768aed36 Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Mon, 23 Sep 2024 14:41:20 +0100 Subject: [PATCH 06/28] Tutorial: Impove section on Lock primitive. --- v3/docs/TUTORIAL.md | 44 ++++++++++++++++++++++++++------------------ 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/v3/docs/TUTORIAL.md b/v3/docs/TUTORIAL.md index d89b86c..49a11f7 100644 --- a/v3/docs/TUTORIAL.md +++ b/v3/docs/TUTORIAL.md @@ -627,17 +627,28 @@ This guarantees unique access to a shared resource. In the following code sample a `Lock` instance `lock` has been created and is passed to all tasks wishing to access the shared resource. Each task attempts to acquire the lock, pausing execution until it succeeds. +```python +from asyncio import Lock +lock = Lock() +``` +Synchronous methods: + * `locked` No args. Returns `True` if locked. + * `release` No args. Releases the lock. See note below. +Asynchronous method: + * `acquire` No args. Pauses until the lock has been acquired. Use by executing + `await lock.acquire()`. +A task waiting on a lock may be cancelled or may be run subject to a timeout. +The normal way to use a `Lock` is in a context manager: ```python import asyncio from asyncio import Lock async def task(i, lock): while 1: - await lock.acquire() - print("Acquired lock in task", i) - await asyncio.sleep(0.5) - lock.release() + async with lock: + print("Acquired lock in task", i) + await asyncio.sleep(0.5) async def main(): lock = Lock() # The Lock instance @@ -648,26 +659,24 @@ async def main(): asyncio.run(main()) # Run for 10s ``` +Use of a context manager is strongly recommended - otherwise an application must +ensure that `.release` is only ever called when that same task has called +`.locked`. Calling `.release` on an unlocked `Lock` will raise a `ValueError`. +Calling it on a `Lock` which has been locked by another task will cause that +second task to produce a `ValueError` when it attempts to release the `Lock` or +when its context manager exits. Context managers avoid these issues. -Methods: - - * `locked` No args. Returns `True` if locked. - * `release` No args. Releases the lock. - * `acquire` No args. Coro which pauses until the lock has been acquired. Use - by executing `await lock.acquire()`. - -A task waiting on a lock may be cancelled or may be run subject to a timeout. -The normal way to use a `Lock` is in a context manager: - +For the brave the following illustrates use without a CM. ```python import asyncio from asyncio import Lock async def task(i, lock): while 1: - async with lock: - print("Acquired lock in task", i) - await asyncio.sleep(0.5) + await lock.acquire() + print("Acquired lock in task", i) + await asyncio.sleep(0.5) + lock.release() async def main(): lock = Lock() # The Lock instance @@ -678,7 +687,6 @@ async def main(): asyncio.run(main()) # Run for 10s ``` - ###### [Contents](./TUTORIAL.md#contents) ## 3.2 Event From d533d81d942ce46f3480c8879bb445717b6b6bf5 Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Mon, 23 Sep 2024 14:49:44 +0100 Subject: [PATCH 07/28] Tutorial: Impove section on Lock primitive. --- v3/docs/TUTORIAL.md | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/v3/docs/TUTORIAL.md b/v3/docs/TUTORIAL.md index 49a11f7..26cd472 100644 --- a/v3/docs/TUTORIAL.md +++ b/v3/docs/TUTORIAL.md @@ -259,7 +259,7 @@ async def bar(x): await asyncio.sleep(1) # Pause 1s async def main(): - tasks = [None] * 3 # For CPython compaibility must store a reference see Note + tasks = [None] * 3 # For CPython compaibility must store a reference see 2.2 Note for x in range(3): tasks[x] = asyncio.create_task(bar(x)) await asyncio.sleep(10) @@ -351,7 +351,7 @@ async def bar(x): await asyncio.sleep(1) # Pause 1s async def main(): - tasks = [None] * 3 # For CPython compaibility must store a reference see Note + tasks = [None] * 3 # For CPython compaibility must store a reference see 2.2 Note for x in range(3): tasks[x] = asyncio.create_task(bar(x)) print('Tasks are running') @@ -621,12 +621,8 @@ The following provides a discussion of the primitives. ## 3.1 Lock -This describes the use of the official `Lock` primitive. - -This guarantees unique access to a shared resource. In the following code -sample a `Lock` instance `lock` has been created and is passed to all tasks -wishing to access the shared resource. Each task attempts to acquire the lock, -pausing execution until it succeeds. +This describes the use of the official `Lock` primitive. This guarantees unique +access to a shared resource. ```python from asyncio import Lock lock = Lock() @@ -639,7 +635,10 @@ Asynchronous method: `await lock.acquire()`. A task waiting on a lock may be cancelled or may be run subject to a timeout. -The normal way to use a `Lock` is in a context manager: +The normal way to use a `Lock` is in a context manager. In the following code +sample a `Lock` instance `lock` has been created and is passed to all tasks +wishing to access the shared resource. Each task attempts to acquire the lock, +pausing execution until it succeeds. ```python import asyncio from asyncio import Lock @@ -652,7 +651,7 @@ async def task(i, lock): async def main(): lock = Lock() # The Lock instance - tasks = [None] * 3 # For CPython compaibility must store a reference see Note + tasks = [None] * 3 # For CPython compaibility must store a reference see 2.2 Note for n in range(1, 4): tasks[n - 1] = asyncio.create_task(task(n, lock)) await asyncio.sleep(10) @@ -680,7 +679,7 @@ async def task(i, lock): async def main(): lock = Lock() # The Lock instance - tasks = [None] * 3 # For CPython compaibility must store a reference see Note + tasks = [None] * 3 # For CPython compaibility must store a reference see 2.2 Note for n in range(1, 4): tasks[n - 1] = asyncio.create_task(task(n, lock)) await asyncio.sleep(10) @@ -911,7 +910,7 @@ async def foo(n, sema): async def main(): sema = Semaphore() - tasks = [None] * 3 # For CPython compaibility must store a reference see Note + tasks = [None] * 3 # For CPython compaibility must store a reference see 2.2 Note for num in range(3): tasks[num] = asyncio.create_task(foo(num, sema)) await asyncio.sleep(2) @@ -1204,7 +1203,7 @@ async def main(): sw1 = asyncio.StreamWriter(UART(1, 9600), {}) sw2 = asyncio.StreamWriter(UART(2, 1200), {}) barrier = Barrier(3) - tasks = [None] * 2 # For CPython compaibility must store a reference see Note + tasks = [None] * 2 # For CPython compaibility must store a reference see 2.2 Note for n, sw in enumerate((sw1, sw2)): tasks[n] = asyncio.create_task(sender(barrier, sw, n + 1)) await provider(barrier) From 5575954a7956f30d370edef2cc8b900a21f6011d Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Mon, 23 Sep 2024 14:51:40 +0100 Subject: [PATCH 08/28] Tutorial: Impove section on Lock primitive. --- v3/docs/TUTORIAL.md | 1 + 1 file changed, 1 insertion(+) diff --git a/v3/docs/TUTORIAL.md b/v3/docs/TUTORIAL.md index 26cd472..cda901f 100644 --- a/v3/docs/TUTORIAL.md +++ b/v3/docs/TUTORIAL.md @@ -630,6 +630,7 @@ lock = Lock() Synchronous methods: * `locked` No args. Returns `True` if locked. * `release` No args. Releases the lock. See note below. + Asynchronous method: * `acquire` No args. Pauses until the lock has been acquired. Use by executing `await lock.acquire()`. From 977f0ad2fe64cb8f1c719d4655a0ed83a4d0939e Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Sat, 19 Oct 2024 12:31:02 +0100 Subject: [PATCH 09/28] Tutorial: Fix indentation error in ThreadSafe flag example. --- v3/docs/TUTORIAL.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/v3/docs/TUTORIAL.md b/v3/docs/TUTORIAL.md index cda901f..09e0e33 100644 --- a/v3/docs/TUTORIAL.md +++ b/v3/docs/TUTORIAL.md @@ -1108,14 +1108,14 @@ async def foo(tsf): # Periodically set the ThreadSafeFlag await asyncio.sleep(1) tsf.set() - def ready(tsf, poller): - r = (tsf, POLLIN) - poller.register(*r) +def ready(tsf, poller): + r = (tsf, POLLIN) + poller.register(*r) - def is_rdy(): - return r in poller.ipoll(0) + def is_rdy(): + return r in poller.ipoll(0) - return is_rdy + return is_rdy async def test(): tsf = asyncio.ThreadSafeFlag() From 51f0bc0f82e2a04ef345de30162b57fc5b1b1a57 Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Sat, 19 Oct 2024 12:38:18 +0100 Subject: [PATCH 10/28] Tutorial: ThreadSafeFlag add clear method. --- v3/docs/TUTORIAL.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/v3/docs/TUTORIAL.md b/v3/docs/TUTORIAL.md index 09e0e33..c54c866 100644 --- a/v3/docs/TUTORIAL.md +++ b/v3/docs/TUTORIAL.md @@ -1019,8 +1019,9 @@ running in another thread or on another core. It operates in a similar way to * It is self-clearing. * Only one task may wait on the flag. -Synchronous method: +Synchronous methods: * `set` Triggers the flag. Like issuing `set` then `clear` to an `Event`. + * `clear` Unconditionally clear down the flag. Asynchronous method: * `wait` Wait for the flag to be set. If the flag is already set then it From f1fff202516231360251265af2c38702637b0be0 Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Sat, 19 Oct 2024 14:49:29 +0100 Subject: [PATCH 11/28] primitives/encoder.py: Simplify code. --- v3/primitives/encoder.py | 23 ++++++----------------- 1 file changed, 6 insertions(+), 17 deletions(-) diff --git a/v3/primitives/encoder.py b/v3/primitives/encoder.py index 0f43b87..47d4d49 100644 --- a/v3/primitives/encoder.py +++ b/v3/primitives/encoder.py @@ -1,6 +1,6 @@ # encoder.py Asynchronous driver for incremental quadrature encoder. -# Copyright (c) 2021-2023 Peter Hinch +# Copyright (c) 2021-2024 Peter Hinch # Released under the MIT License (MIT) - see LICENSE file # For an explanation of the design please see @@ -17,19 +17,10 @@ # Raul Kompaß (@rkompass) for suggesting a bugfix here # https://forum.micropython.org/viewtopic.php?f=15&t=9929&p=66175#p66156 +# Now uses ThreadSafeFlag.clear() + import asyncio from machine import Pin -from select import poll, POLLIN - - -def ready(tsf, poller): - r = (tsf, POLLIN) - poller.register(*r) - - def is_rdy(): - return r in poller.ipoll(0) - - return is_rdy class Encoder: @@ -58,7 +49,6 @@ def __init__( if ((vmin is not None) and v < vmin) or ((vmax is not None) and v > vmax): raise ValueError("Incompatible args: must have vmin <= v <= vmax") self._tsf = asyncio.ThreadSafeFlag() - self._tsf_ready = ready(self._tsf, poll()) # Create a ready function trig = Pin.IRQ_RISING | Pin.IRQ_FALLING try: xirq = pin_x.irq(trigger=trig, handler=self._x_cb, hard=True) @@ -90,10 +80,9 @@ async def _run(self, vmin, vmax, div, mod, cb, args): plcv = pcv # Previous value after limits applied delay = self.delay while True: - if delay > 0 and self._tsf_ready(): # Ensure ThreadSafeFlag is clear - await self._tsf.wait() - await self._tsf.wait() - await asyncio.sleep_ms(delay) # Wait for motion to stop. + self._tsf.clear() + await self._tsf.wait() # Wait for an edge. A stopped encoder waits here. + await asyncio.sleep_ms(delay) # Optional rate limit for callback/trig. hv = self._v # Sample hardware (atomic read). if hv == pv: # A change happened but was negated before continue # this got scheduled. Nothing to do. From b4becb1362e90629b1c33719a4f961f06854992e Mon Sep 17 00:00:00 2001 From: peterhinch Date: Sat, 19 Oct 2024 18:29:54 +0100 Subject: [PATCH 12/28] Tutorial: Improve section on polling ThreadSafeFlag. --- v3/docs/TUTORIAL.md | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/v3/docs/TUTORIAL.md b/v3/docs/TUTORIAL.md index c54c866..e516419 100644 --- a/v3/docs/TUTORIAL.md +++ b/v3/docs/TUTORIAL.md @@ -1098,7 +1098,9 @@ class which allows multiple tasks to wait on it. ### 3.6.1 Querying a ThreadSafeFlag -The state of a ThreadSafeFlag may be tested as follows: +The `ThreadSafeFlag` class has no equivalent to `Event.is_set`. A synchronous +function which returns the state of a `ThreadSafeFlag` instance may be created +as follows: ```python import asyncio from select import poll, POLLIN @@ -1109,12 +1111,12 @@ async def foo(tsf): # Periodically set the ThreadSafeFlag await asyncio.sleep(1) tsf.set() -def ready(tsf, poller): +def ready(tsf, poller): # Return a function which returns tsf status r = (tsf, POLLIN) poller.register(*r) def is_rdy(): - return r in poller.ipoll(0) + return r in poller.ipoll(0) # Immediate return return is_rdy @@ -1136,9 +1138,12 @@ async def test(): asyncio.run(test()) ``` The `ready` closure returns a nonblocking function which tests the status of a -given flag. In the above example `.wait()` is not called until the flag has been +passed flag. In this example `.wait()` is not called until the flag has been set, consequently `.wait()` returns rapidly. +The `select.poll` mechanism works because `ThreadSafeFlag` is subclassed from +`io.IOBase` and has an `ioctl` method. + ###### [Contents](./TUTORIAL.md#contents) ## 3.7 Barrier From 22a695ea4be888c82bb1608d252440ac749094ce Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Sun, 20 Oct 2024 12:15:36 +0100 Subject: [PATCH 13/28] encoder.py: Fix declaration of __anext__. --- v3/primitives/encoder.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/v3/primitives/encoder.py b/v3/primitives/encoder.py index 47d4d49..ef9b561 100644 --- a/v3/primitives/encoder.py +++ b/v3/primitives/encoder.py @@ -104,7 +104,7 @@ async def _run(self, vmin, vmax, div, mod, cb, args): def __aiter__(self): return self - def __anext__(self): + async def __anext__(self): await self._trig.wait() self._trig.clear() return self._cv From 12f0be66d467d3ce0b392f9e026c230cce15758b Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Wed, 30 Oct 2024 10:33:00 +0000 Subject: [PATCH 14/28] Docs: Add official RingIO class. --- v3/docs/INTERRUPTS.md | 62 +++++++++++++++++++++++++++++++--- v3/docs/THREADING.md | 2 +- v3/primitives/ringbuf_queue.py | 4 ++- 3 files changed, 61 insertions(+), 7 deletions(-) diff --git a/v3/docs/INTERRUPTS.md b/v3/docs/INTERRUPTS.md index 54dcd70..138a2b0 100644 --- a/v3/docs/INTERRUPTS.md +++ b/v3/docs/INTERRUPTS.md @@ -184,17 +184,69 @@ async def process_data(): await tsf.wait() # Process the data here before waiting for the next interrupt ``` +## 3.4 micropython.RingIO + +This is a byte-oriented circular buffer [documented here] +(https://docs.micropython.org/en/latest/library/micropython.html#micropython.RingIO), +which provides an efficient way to return data from an ISR to an `asyncio` task. +It is implemented in C so performance is high, and supports stream I/O. The +following is a usage example: +```py +import asyncio +from machine import Timer +import micropython +micropython.alloc_emergency_exception_buf(100) + +imu = SomeDevice() # Fictional hardware IMU device + +FRAMESIZE = 8 # Count, x, y, z accel +BUFSIZE = 200 # No. of records. Size allows for up to 200ms of asyncio latency. +rio = micropython.RingIO(FRAMESIZE * BUFSIZE + 1) # RingIO requires an extra byte +count = 0x4000 # Bit14 is "Start of frame" marker. Low bits are a frame counter. + +def cb(_): # Timer callback. Runs at 1KHz. + global count # Frame count + imu.get_accel_irq() # Trigger the device + rio.write(chr(count >> 8)) + rio.write(chr(count & 0xff)) + rio.write(imu.accel.ix) # Device returns bytes objects (length 2) + rio.write(imu.accel.iy) + rio.write(imu.accel.iz) + count += 1 + +async def main(nrecs): + t = Timer(freq=1_000, callback=cb) + sreader = asyncio.StreamReader(rio) + rpb = 100 # Records per block + blocksize = FRAMESIZE * rpb + with open('/sd/imudata', 'wb') as f: + swriter = asyncio.StreamWriter(f, {}) + while nrecs: + data = await sreader.readexactly(blocksize) + swriter.write(data) + await swriter.drain() + nrecs -= rpb + t.deinit() + +asyncio.run(main(1_000)) +``` +In this example data is acquired at a timer-controlled rate of 1KHz, with eight +bytes being written to the `RingIO` every tick. The `main()` task reads the data +stream and writes it out to a file. Similar code was tested on a Pyboard 1.1. -## 3.4 Thread Safe Classes +## 3.5 Other Thread Safe Classes Other classes capable of being used to interface an ISR with `asyncio` are discussed [here](https://github.com/peterhinch/micropython-async/blob/master/v3/docs/THREADING.md), -notably the `ThreadSafeQueue`. +notably the `ThreadSafeQueue`. This ring buffer allows entries to be objects +other than bytes. It supports the asynchronous iterator protocol (rather than +stream I/O) and is written in Python. # 4. Conclusion -The key take-away is that `ThreadSafeFlag` is the only official `asyncio` -construct which can safely be used in an ISR context. Unofficial "thread -safe" classes may also be used. +The `ThreadSafeFlag` and `RingIO` classes are the official `asyncio` constructs +which can safely be used in an ISR context. Unofficial "thread safe" classes may +also be used. Beware of classes such as `Queue` and `RingbufQueue` which are not +thread safe. ###### [Main tutorial](./TUTORIAL.md#contents) diff --git a/v3/docs/THREADING.md b/v3/docs/THREADING.md index f13c343..04c6b34 100644 --- a/v3/docs/THREADING.md +++ b/v3/docs/THREADING.md @@ -340,7 +340,7 @@ instances are required. Attributes of `ThreadSafeQueue`: 1. It is of fixed capacity defined on instantiation. 2. It uses a pre-allocated buffer of user selectable type (`Queue` uses a - dynaically allocated `list`). + dynamically allocated `list`). 3. It is an asynchronous iterator allowing retrieval with `async for`. 4. It provides synchronous "put" and "get" methods. If the queue becomes full (put) or empty (get), behaviour is user definable. The method either blocks or diff --git a/v3/primitives/ringbuf_queue.py b/v3/primitives/ringbuf_queue.py index 65366d3..d2b6f90 100644 --- a/v3/primitives/ringbuf_queue.py +++ b/v3/primitives/ringbuf_queue.py @@ -6,7 +6,9 @@ # API differs from CPython # Uses pre-allocated ring buffer: can use list or array # Asynchronous iterator allowing consumer to use async for -# put_nowait QueueFull exception can be ignored allowing oldest data to be discarded. +# put_nowait QueueFull exception can be ignored allowing oldest data to be discarded - +# this is not thread safe, however the class as a whole is not TS because of its +# use of Event objects. import asyncio From d059887196bc3c87e1edaf1df5b21885ee127a98 Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Fri, 29 Nov 2024 18:12:33 +0000 Subject: [PATCH 15/28] Primitives: Add Broker class. --- v3/docs/DRIVERS.md | 125 +++++++++++++++++++++++++++-- v3/primitives/__init__.py | 2 + v3/primitives/broker.py | 52 ++++++++++++ v3/primitives/package.json | 1 + v3/primitives/tests/broker_test.py | 101 +++++++++++++++++++++++ 5 files changed, 274 insertions(+), 7 deletions(-) create mode 100644 v3/primitives/broker.py create mode 100644 v3/primitives/tests/broker_test.py diff --git a/v3/docs/DRIVERS.md b/v3/docs/DRIVERS.md index 75a3ce6..6117cc4 100644 --- a/v3/docs/DRIVERS.md +++ b/v3/docs/DRIVERS.md @@ -26,11 +26,15 @@ MicroPython's `asyncio` when used in a microcontroller context. 6.1 [Encoder class](./DRIVERS.md#61-encoder-class) 7. [Ringbuf Queue](./DRIVERS.md#7-ringbuf-queue) A MicroPython optimised queue primitive. 8. [Delay_ms class](./DRIVERS.md#8-delay_ms-class) A flexible retriggerable delay with callback or Event interface. - 9. [Additional functions](./DRIVERS.md#9-additional-functions) - 9.1 [launch](./DRIVERS.md#91-launch) Run a coro or callback interchangeably. - 9.2 [set_global_exception](./DRIVERS.md#92-set_global_exception) Simplify debugging with a global exception handler. + 9. [Message Broker](./DRIVERS.md#9-message-broker) A flexible means of messaging between + tasks. + 9.1 [Further examples](./DRIVERS.md#91-further-examples) + 9.2 [User agents](./DRIVERS.md#92-user-agents) + 10. [Additional functions](./DRIVERS.md#10-additional-functions) + 10.1 [launch](./DRIVERS.md#101-launch) Run a coro or callback interchangeably. + 10.2 [set_global_exception](./DRIVERS.md#102-set_global_exception) Simplify debugging with a global exception handler. -###### [Tutorial](./TUTORIAL.md#contents) +###### [asyncio Tutorial](./TUTORIAL.md#contents) # 1. Introduction @@ -1126,9 +1130,116 @@ finally: ``` ###### [Contents](./DRIVERS.md#0-contents) -# 9. Additional functions +# 9. Message Broker + +This is under development: please check for updates. + +The `Broker` class provides a flexible means of messaging between running tasks. +It uses a publish-subscribe model (akin to MQTT) whereby the transmitting task +publishes to a topic. Any tasks subscribed to that topic will receive the +message. This enables one to one, one to many or many to many messaging. + +A task subscribes to a topic with an `agent`. This is stored by the broker. When +the broker publishes a message, the `agent` of each task subscribed to its topic +will be triggered. In the simplest case the `agent` is a `Queue` instance: the +broker puts the topic and message onto the subscriber's queue for retrieval. + +More advanced agents can perform actions in response to a message, such as +calling a function or launching a `task`. + +Broker methods. All are synchronous, constructor has no args: +* `subscribe(topic, agent)` Passed `agent` will be triggered by messages with a +matching `topic`. +* `unsubscribe(topic, agent)` The `agent` will stop being triggered. +* `publish(topic, message)` All `agent` instances subscribed to `topic` will be +triggered, receiving `topic` and `message` args. Returns `True` unless a `Queue` +agent has become full, in which case data for that queue has been lost. + +The `topic` arg is typically a string but may be any hashable object. A +`message` is an arbitrary Python object. An `agent` may be any of the following: +* `Queue` When a message is received receives 2-tuple `(topic, message)`. +* `function` Called when a message is received. Gets 2 args, topic and message. +* `bound method` Called when a message is received. Gets 2 args, topic and +message. +* `coroutine` Task created when a message is received with 2 args, topic and +message. +* `bound coroutine` Task created when a message is received with 2 args, topic +and message. +* Instance of a user class. See user agents below. +* `Event` Set when a message is received. + +Note that synchronous `agent` instances must run to completion quickly otherwise +the `publish` method will be slowed. + +The following is a simple example: +```py +import asyncio +from primitives import Broker, Queue + +broker = Broker() +queue = Queue() +async def sender(t): + for x in range(t): + await asyncio.sleep(1) + broker.publish("foo_topic", f"test {x}") + +async def main(): + broker.subscribe("foo_topic", queue) + n = 10 + asyncio.create_task(sender(n)) + print("Letting queue part-fill") + await asyncio.sleep(5) + for _ in range(n): + topic, message = await queue.get() + print(topic, message) + +asyncio.run(main()) +``` +## 9.1 Further examples + +An interesting application is to extend MQTT into the Python code +(see [mqtt_as](https://github.com/peterhinch/micropython-mqtt/tree/master)). +This is as simple as: +```py +async def messages(client): + async for topic, msg, retained in client.queue: + broker.publish(topic.decode(), msg.decode()) +``` +Assuming the MQTT client is subscribed to multiple topics, message strings are +directed to individual tasks each supporting one topic. + +## 9.2 User agents + +An `agent` can be an instance of a user class. The class must be a subclass of +`Agent`, and it must support a synchronous `.put` method. The latter takes two +args, being `topic` and `message`. It should run to completion quickly. + +```py +import asyncio +from primitives import Broker, Agent + +broker = Broker() +class MyAgent(Agent): + def put(sef, topic, message): + print(f"User agent. Topic: {topic} Message: {message}") + +async def sender(t): + for x in range(t): + await asyncio.sleep(1) + broker.publish("foo_topic", f"test {x}") + +async def main(): + broker.subscribe("foo_topic", MyAgent()) + await sender(10) + +asyncio.run(main()) +``` + +###### [Contents](./DRIVERS.md#0-contents) + +# 10. Additional functions -## 9.1 Launch +## 10.1 Launch Import as follows: ```python @@ -1140,7 +1251,7 @@ runs it and returns the callback's return value. If a coro is passed, it is converted to a `task` and run asynchronously. The return value is the `task` instance. A usage example is in `primitives/switch.py`. -## 9.2 set_global_exception +## 10.2 set_global_exception Import as follows: ```python diff --git a/v3/primitives/__init__.py b/v3/primitives/__init__.py index 0431e3c..a6914f9 100644 --- a/v3/primitives/__init__.py +++ b/v3/primitives/__init__.py @@ -53,6 +53,8 @@ def _handle_exception(loop, context): "RingbufQueue": "ringbuf_queue", "Keyboard": "sw_array", "SwArray": "sw_array", + "Broker": "broker", + "Agent": "broker", } # Copied from uasyncio.__init__.py diff --git a/v3/primitives/broker.py b/v3/primitives/broker.py new file mode 100644 index 0000000..3a0ed20 --- /dev/null +++ b/v3/primitives/broker.py @@ -0,0 +1,52 @@ +# broker.py A message broker for MicroPython + +# Copyright (c) 2024 Peter Hinch +# Released under the MIT License (MIT) - see LICENSE file + +# Inspired by the following +# https://www.joeltok.com/posts/2021-03-building-an-event-bus-in-python/ + +import asyncio +from primitives import Queue, type_coro + + +class Agent: + pass + + +class Broker(dict): + def subscribe(self, topic, agent): + if not self.get(topic, False): + self[topic] = {agent} + else: + self[topic].add(agent) + + def unsubscribe(self, topic, agent): + try: + self[topic].remove(agent) + if len(self[topic]) == 0: + del self[topic] + except KeyError: + pass # Topic already removed + + def publish(self, topic, message): + agents = self.get(topic, []) + result = True + for agent in agents: + if isinstance(agent, asyncio.Event): + agent.set() + continue + if isinstance(agent, Agent): # User class + agent.put(topic, message) # Must support .put + continue + if isinstance(agent, Queue): + if agent.full(): + result = False + else: + agent.put_nowait((topic, message)) + continue + # agent is function, method, coroutine or bound coroutine + res = agent(topic, message) + if isinstance(res, type_coro): + asyncio.create_task(res) + return result diff --git a/v3/primitives/package.json b/v3/primitives/package.json index 8f7e7a7..2adf5cf 100644 --- a/v3/primitives/package.json +++ b/v3/primitives/package.json @@ -3,6 +3,7 @@ ["primitives/__init__.py", "github:peterhinch/micropython-async/v3/primitives/__init__.py"], ["primitives/aadc.py", "github:peterhinch/micropython-async/v3/primitives/aadc.py"], ["primitives/barrier.py", "github:peterhinch/micropython-async/v3/primitives/barrier.py"], + ["primitives/broker.py", "github:peterhinch/micropython-async/v3/primitives/broker.py"], ["primitives/condition.py", "github:peterhinch/micropython-async/v3/primitives/condition.py"], ["primitives/delay_ms.py", "github:peterhinch/micropython-async/v3/primitives/delay_ms.py"], ["primitives/encoder.py", "github:peterhinch/micropython-async/v3/primitives/encoder.py"], diff --git a/v3/primitives/tests/broker_test.py b/v3/primitives/tests/broker_test.py new file mode 100644 index 0000000..cbc70d2 --- /dev/null +++ b/v3/primitives/tests/broker_test.py @@ -0,0 +1,101 @@ +# broker_test.py Test various types of subscriber + +# import primitives.tests.broker_test + +import asyncio +from primitives import Broker, Queue + +broker = Broker() + +# Periodically publish messages to two topics +async def test(t): + for x in range(t): + await asyncio.sleep(1) + broker.publish("foo_topic", f"dogs {x}") + broker.publish("bar_topic", f"rats {x}") + + +# Suscribe via coroutine +async def subs(topic, message): + await asyncio.sleep_ms(100) + print("coroutine", topic, message) + + +# Subscribe via function +def func(topic, message): + print("function", topic, message) + + +# Subscribe via Event + +event = asyncio.Event() + + +async def event_test(): + while True: + await event.wait() + event.clear() + print("Event triggered") + + +class TestClass: + async def fetch_data(self, topic, message): + await asyncio.sleep_ms(100) + print("bound coro", topic, message) + + def get_data(self, topic, message): + print("bound method", topic, message) + + +async def print_queue(q): + while True: + topic, message = await q.get() + print(topic, message) + + +async def main(): + tc = TestClass() + q = Queue(10) + print("Subscribing Event, coroutine, Queue and bound coroutine.") + broker.subscribe("foo_topic", tc.fetch_data) # Bound coroutine + broker.subscribe("bar_topic", subs) # Coroutine + broker.subscribe("bar_topic", event) + broker.subscribe("foo_topic", q) + + asyncio.create_task(test(30)) # Publish to topics for 30s + asyncio.create_task(event_test()) + await asyncio.sleep(5) + print() + print("Unsubscribing coroutine") + broker.unsubscribe("bar_topic", subs) + await asyncio.sleep(5) + print() + print("Unsubscribing Event") + broker.unsubscribe("bar_topic", event) + print() + print("Subscribing function") + broker.subscribe("bar_topic", func) + await asyncio.sleep(5) + print() + print("Unsubscribing function") + broker.unsubscribe("bar_topic", func) + print() + print("Unsubscribing bound coroutine") + broker.unsubscribe("foo_topic", tc.fetch_data) # Async method + print() + print("Subscribing method") + broker.subscribe("foo_topic", tc.get_data) # Sync method + await asyncio.sleep(5) + print() + print("Unsubscribing method") + broker.unsubscribe("foo_topic", tc.get_data) # Async method + print("Pause 5s") + await asyncio.sleep(5) + print("Retrieving foo_topic messages from queue") + try: + await asyncio.wait_for(print_queue(q), 5) + except asyncio.TimeoutError: + print("Done") + + +asyncio.run(main()) From 7d0b3d3e48be89720ad2743a8039965f281f059c Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Sat, 30 Nov 2024 12:43:47 +0000 Subject: [PATCH 16/28] DRIVERS.md: Broker - clarify Queue becoming full. --- v3/docs/DRIVERS.md | 6 ++++-- v3/docs/PRIMITIVES.md | 1 + 2 files changed, 5 insertions(+), 2 deletions(-) create mode 100644 v3/docs/PRIMITIVES.md diff --git a/v3/docs/DRIVERS.md b/v3/docs/DRIVERS.md index 6117cc4..95218b5 100644 --- a/v3/docs/DRIVERS.md +++ b/v3/docs/DRIVERS.md @@ -1132,7 +1132,8 @@ finally: # 9. Message Broker -This is under development: please check for updates. +This is under development: please check for updates. See +[code](https://github.com/peterhinch/micropython-async/blob/master/v3/primitives/broker.py). The `Broker` class provides a flexible means of messaging between running tasks. It uses a publish-subscribe model (akin to MQTT) whereby the transmitting task @@ -1153,7 +1154,8 @@ matching `topic`. * `unsubscribe(topic, agent)` The `agent` will stop being triggered. * `publish(topic, message)` All `agent` instances subscribed to `topic` will be triggered, receiving `topic` and `message` args. Returns `True` unless a `Queue` -agent has become full, in which case data for that queue has been lost. +agent has become full. A `False` value indicates that at least one message has +been lost. The `topic` arg is typically a string but may be any hashable object. A `message` is an arbitrary Python object. An `agent` may be any of the following: diff --git a/v3/docs/PRIMITIVES.md b/v3/docs/PRIMITIVES.md new file mode 100644 index 0000000..30e6e97 --- /dev/null +++ b/v3/docs/PRIMITIVES.md @@ -0,0 +1 @@ +### For historical reasons documentation for primitives may be found [here](./DRIVERS.md). From 709663d632bfe3b378b03607eb30e9d4ad68cfdd Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Sun, 1 Dec 2024 11:12:02 +0000 Subject: [PATCH 17/28] broker: Prior to adding args. --- v3/docs/DRIVERS.md | 12 +++++++----- v3/primitives/broker.py | 12 +++++------- v3/primitives/tests/broker_test.py | 13 +++++++++---- 3 files changed, 21 insertions(+), 16 deletions(-) diff --git a/v3/docs/DRIVERS.md b/v3/docs/DRIVERS.md index 95218b5..9ee3900 100644 --- a/v3/docs/DRIVERS.md +++ b/v3/docs/DRIVERS.md @@ -1134,7 +1134,9 @@ finally: This is under development: please check for updates. See [code](https://github.com/peterhinch/micropython-async/blob/master/v3/primitives/broker.py). - +```py +from primitives import Broker +``` The `Broker` class provides a flexible means of messaging between running tasks. It uses a publish-subscribe model (akin to MQTT) whereby the transmitting task publishes to a topic. Any tasks subscribed to that topic will receive the @@ -1153,13 +1155,13 @@ Broker methods. All are synchronous, constructor has no args: matching `topic`. * `unsubscribe(topic, agent)` The `agent` will stop being triggered. * `publish(topic, message)` All `agent` instances subscribed to `topic` will be -triggered, receiving `topic` and `message` args. Returns `True` unless a `Queue` -agent has become full. A `False` value indicates that at least one message has -been lost. +triggered, receiving `topic` and `message` args. The method is not threadsafe; +it should not be called from a hard ISR or from another thread. The `topic` arg is typically a string but may be any hashable object. A `message` is an arbitrary Python object. An `agent` may be any of the following: * `Queue` When a message is received receives 2-tuple `(topic, message)`. +* `RingbufQueue` When a message is received receives 2-tuple `(topic, message)`. * `function` Called when a message is received. Gets 2 args, topic and message. * `bound method` Called when a message is received. Gets 2 args, topic and message. @@ -1179,7 +1181,7 @@ import asyncio from primitives import Broker, Queue broker = Broker() -queue = Queue() +queue = Queue() # Or (e.g. RingbufQueue(20)) async def sender(t): for x in range(t): await asyncio.sleep(1) diff --git a/v3/primitives/broker.py b/v3/primitives/broker.py index 3a0ed20..35989c0 100644 --- a/v3/primitives/broker.py +++ b/v3/primitives/broker.py @@ -7,7 +7,7 @@ # https://www.joeltok.com/posts/2021-03-building-an-event-bus-in-python/ import asyncio -from primitives import Queue, type_coro +from primitives import Queue, RingbufQueue, type_coro class Agent: @@ -31,7 +31,6 @@ def unsubscribe(self, topic, agent): def publish(self, topic, message): agents = self.get(topic, []) - result = True for agent in agents: if isinstance(agent, asyncio.Event): agent.set() @@ -39,14 +38,13 @@ def publish(self, topic, message): if isinstance(agent, Agent): # User class agent.put(topic, message) # Must support .put continue - if isinstance(agent, Queue): - if agent.full(): - result = False - else: + if isinstance(agent, Queue) or isinstance(agent, RingbufQueue): + try: agent.put_nowait((topic, message)) + except Exception: # TODO + pass continue # agent is function, method, coroutine or bound coroutine res = agent(topic, message) if isinstance(res, type_coro): asyncio.create_task(res) - return result diff --git a/v3/primitives/tests/broker_test.py b/v3/primitives/tests/broker_test.py index cbc70d2..8eb15a0 100644 --- a/v3/primitives/tests/broker_test.py +++ b/v3/primitives/tests/broker_test.py @@ -3,7 +3,7 @@ # import primitives.tests.broker_test import asyncio -from primitives import Broker, Queue +from primitives import Broker, Queue, RingbufQueue broker = Broker() @@ -56,11 +56,13 @@ async def print_queue(q): async def main(): tc = TestClass() q = Queue(10) - print("Subscribing Event, coroutine, Queue and bound coroutine.") + rq = RingbufQueue(10) + print("Subscribing Event, coroutine, Queue, RingbufQueue and bound coroutine.") broker.subscribe("foo_topic", tc.fetch_data) # Bound coroutine broker.subscribe("bar_topic", subs) # Coroutine broker.subscribe("bar_topic", event) broker.subscribe("foo_topic", q) + broker.subscribe("bar_topic", rq) asyncio.create_task(test(30)) # Publish to topics for 30s asyncio.create_task(event_test()) @@ -91,11 +93,14 @@ async def main(): broker.unsubscribe("foo_topic", tc.get_data) # Async method print("Pause 5s") await asyncio.sleep(5) - print("Retrieving foo_topic messages from queue") + print("Retrieving foo_topic messages from Queue") try: await asyncio.wait_for(print_queue(q), 5) except asyncio.TimeoutError: - print("Done") + print("Timeout") + print("Retrieving bar_topic messages from RingbufQueue") + async for topic, message in rq: + print(topic, message) asyncio.run(main()) From 596e4636f21fe83f206859afadc5f12042c730df Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Sun, 1 Dec 2024 18:23:33 +0000 Subject: [PATCH 18/28] broker.py: Agents can have args. --- v3/docs/DRIVERS.md | 152 +++++++++++++++++++++++------ v3/primitives/broker.py | 41 +++++--- v3/primitives/tests/broker_test.py | 28 ++++-- 3 files changed, 166 insertions(+), 55 deletions(-) diff --git a/v3/docs/DRIVERS.md b/v3/docs/DRIVERS.md index 9ee3900..dd7839f 100644 --- a/v3/docs/DRIVERS.md +++ b/v3/docs/DRIVERS.md @@ -29,7 +29,8 @@ MicroPython's `asyncio` when used in a microcontroller context. 9. [Message Broker](./DRIVERS.md#9-message-broker) A flexible means of messaging between tasks. 9.1 [Further examples](./DRIVERS.md#91-further-examples) - 9.2 [User agents](./DRIVERS.md#92-user-agents) + 9.2 [User agents](./DRIVERS.md#92-user-agents) User defined Agent classes. + 9.3 [Notes](./DRIVERS.md#93-notes) 10. [Additional functions](./DRIVERS.md#10-additional-functions) 10.1 [launch](./DRIVERS.md#101-launch) Run a coro or callback interchangeably. 10.2 [set_global_exception](./DRIVERS.md#102-set_global_exception) Simplify debugging with a global exception handler. @@ -1027,6 +1028,9 @@ def add_item(q, data): # 8. Delay_ms class +```python +from primitives import Delay_ms # delay_ms.py +``` This implements the software equivalent of a retriggerable monostable or a watchdog timer. It has an internal boolean `running` state. When instantiated the `Delay_ms` instance does nothing, with `running` `False` until triggered. @@ -1132,15 +1136,16 @@ finally: # 9. Message Broker -This is under development: please check for updates. See -[code](https://github.com/peterhinch/micropython-async/blob/master/v3/primitives/broker.py). -```py -from primitives import Broker +This is under development: please check for updates. + +```python +from primitives import Broker # broker.py ``` The `Broker` class provides a flexible means of messaging between running tasks. It uses a publish-subscribe model (akin to MQTT) whereby the transmitting task publishes to a topic. Any tasks subscribed to that topic will receive the -message. This enables one to one, one to many or many to many messaging. +message. This enables one to one, one to many, many to one or many to many +messaging. A task subscribes to a topic with an `agent`. This is stored by the broker. When the broker publishes a message, the `agent` of each task subscribed to its topic @@ -1148,34 +1153,53 @@ will be triggered. In the simplest case the `agent` is a `Queue` instance: the broker puts the topic and message onto the subscriber's queue for retrieval. More advanced agents can perform actions in response to a message, such as -calling a function or launching a `task`. +calling a function, launching a `task` or lighting an LED. + +#### Broker methods -Broker methods. All are synchronous, constructor has no args: -* `subscribe(topic, agent)` Passed `agent` will be triggered by messages with a -matching `topic`. -* `unsubscribe(topic, agent)` The `agent` will stop being triggered. +All are synchronous. They are not threadsafe so should not be called from a hard +ISR or from another thread. The constructor has no args. +* `subscribe(topic, agent, *args)` Passed `agent` will be triggered by messages +with a matching `topic`. Any additional args will be passed to the `agent` when +it is triggered. +* `unsubscribe(topic, agent, *args)` The `agent` will stop being triggered. If +args were passed on subscription, the same args must be passed. * `publish(topic, message)` All `agent` instances subscribed to `topic` will be -triggered, receiving `topic` and `message` args. The method is not threadsafe; -it should not be called from a hard ISR or from another thread. +triggered, receiving `topic` and `message` plus any further args that were +passed to `subscribe`. The `topic` arg is typically a string but may be any hashable object. A -`message` is an arbitrary Python object. An `agent` may be any of the following: -* `Queue` When a message is received receives 2-tuple `(topic, message)`. -* `RingbufQueue` When a message is received receives 2-tuple `(topic, message)`. -* `function` Called when a message is received. Gets 2 args, topic and message. -* `bound method` Called when a message is received. Gets 2 args, topic and -message. -* `coroutine` Task created when a message is received with 2 args, topic and -message. -* `bound coroutine` Task created when a message is received with 2 args, topic -and message. -* Instance of a user class. See user agents below. +`message` is an arbitrary Python object. + +#### Agent types + +An `agent` may be any of the following: + +* `Queue` When a message is received it receives 2-tuple `(topic, message)`. If +extra args were passed on subscription the queue receives a 3-tuple. +`(topic, message, (args...))`. +* `RingbufQueue` When a message is received it receives 2-tuple `(topic, message)`. +If extra args were passed on subscription it receives a 3-tuple, +`(topic, message, (args...))`. +* `function` Called when a message is received. Args: topic, message plus any +further args. +* `bound method` Called when a message is received. Args: topic, message plus any +further args. +* `coroutine` Converted to a `task` when a message is received. Args: topic, +message plus any further args. +* `bound coroutine` Converted to a `task` when a message is received. Args: topic, +message plus any further args. +* `user_agent` Instance of a user class. See user agents below. * `Event` Set when a message is received. Note that synchronous `agent` instances must run to completion quickly otherwise the `publish` method will be slowed. -The following is a simple example: +#### Broker class variable + +* `Verbose=True` Enables printing of debug messages. + +#### example ```py import asyncio from primitives import Broker, Queue @@ -1212,11 +1236,43 @@ async def messages(client): Assuming the MQTT client is subscribed to multiple topics, message strings are directed to individual tasks each supporting one topic. +The following illustrates a use case for `agent` args. +```py +import asyncio +from primitives import Broker +from machine import Pin +red = Pin("A13", Pin.OUT, value=0) # Pin nos. for Pyboard V1.1 +green = Pin("A14", Pin.OUT, value=0) +broker = Broker() + +async def flash(): + broker.publish("led", 1) + await asyncio.sleep(1) + broker.publish("led", 0) + +def recv(topic, message, led): + led(message) # Light or extinguish an LED + +async def main(): + broker.subscribe("led", recv, red) + broker.subscribe("led", recv, green) + for _ in range(10): + await flash() + await asyncio.sleep(1) + broker.unsubscribe("led", recv, green) # Arg(s) must be passed + for _ in range(3): + await flash() + await asyncio.sleep(1) + +asyncio.run(main()) +``` + ## 9.2 User agents An `agent` can be an instance of a user class. The class must be a subclass of -`Agent`, and it must support a synchronous `.put` method. The latter takes two -args, being `topic` and `message`. It should run to completion quickly. +`Agent`, and it must support a synchronous `.put` method. Arguments are `topic` +and `message`, followed by any further args passed on subscription. The method +should run to completion quickly. ```py import asyncio @@ -1224,8 +1280,8 @@ from primitives import Broker, Agent broker = Broker() class MyAgent(Agent): - def put(sef, topic, message): - print(f"User agent. Topic: {topic} Message: {message}") + def put(sef, topic, message, arg): + print(f"User agent. Topic: {topic} Message: {message} Arg: {arg}") async def sender(t): for x in range(t): @@ -1233,11 +1289,47 @@ async def sender(t): broker.publish("foo_topic", f"test {x}") async def main(): - broker.subscribe("foo_topic", MyAgent()) + broker.subscribe("foo_topic", MyAgent(), 42) await sender(10) asyncio.run(main()) ``` +## 9.3 Notes + +#### The publish/subscribe model + +As in the real world publication carries no guarantee of reception. If at the +time of publication there are no tasks with subscribed `agent` instances, the +message will silently be lost. + +#### agent arguments + +Arguments must be hashable objects. Mutable objects such as lists and +dictionaries are not permitted. If an object can be added to a `set` it is +valid. In general, interfaces such as `Pin` instances are OK. + +#### agent uniqueness + +An `agent` can be subscribed to multiple `topic`s. An `agent` may be subscribed +to a `topic` multiple times only if each instance has different arguments. + +#### queues + +If a message causes a queue to fill, a message will silently be lost. It is the +responsibility of the subscriber to avoid this. In the case of a `Queue` +instance the lost message is the one causing the overflow. In the case of +`RingbufQueue` the oldest message in the queue is discarded. In some +applications this behaviour is preferable. + +#### exceptions + +An instance of an `agent` objects is owned by a subscribing tasks but is +executed by a publishing task. If a function used as an `agent` throws an +exception, the traceback will point to a `Broker.publish` call. + +The `Broker` class does not throw exceptions. There are a number of non-fatal +conditions which can occur such as a queue overflow or an attempt to unsubscribe +an `agent` twice. The `Broker` will report these if `Broker.Verboase=True`. ###### [Contents](./DRIVERS.md#0-contents) diff --git a/v3/primitives/broker.py b/v3/primitives/broker.py index 35989c0..de88c58 100644 --- a/v3/primitives/broker.py +++ b/v3/primitives/broker.py @@ -15,36 +15,45 @@ class Agent: class Broker(dict): - def subscribe(self, topic, agent): - if not self.get(topic, False): - self[topic] = {agent} - else: - self[topic].add(agent) + Verbose = True - def unsubscribe(self, topic, agent): - try: - self[topic].remove(agent) + def subscribe(self, topic, agent, *args): + aa = (agent, args) + if not (t := self.get(topic, False)): + self[topic] = {aa} + else: + if aa in t and Broker.Verbose: + print(f"Duplicate agent {aa} in topic {topic}.") + t.add(aa) + + def unsubscribe(self, topic, agent, *args): + if topic in self: + if (aa := (agent, args)) in self[topic]: + self[topic].remove(aa) + elif Broker.Verbose: + print(f"Unsubscribe agent {aa} from topic {topic} fail: agent not subscribed.") if len(self[topic]) == 0: del self[topic] - except KeyError: - pass # Topic already removed + elif Broker.Verbose: + print(f"Unsubscribe topic {topic} fail: topic not subscribed.") def publish(self, topic, message): agents = self.get(topic, []) - for agent in agents: + for agent, args in agents: if isinstance(agent, asyncio.Event): agent.set() continue if isinstance(agent, Agent): # User class - agent.put(topic, message) # Must support .put + agent.put(topic, message, *args) # Must support .put continue if isinstance(agent, Queue) or isinstance(agent, RingbufQueue): + t = (topic, message, args) try: - agent.put_nowait((topic, message)) - except Exception: # TODO - pass + agent.put_nowait(t if args else t[:2]) + except Exception: # Queue discards current message. RingbufQueue discards oldest + Broker.verbose and print(f"Message lost topic {topic} message {message}") continue # agent is function, method, coroutine or bound coroutine - res = agent(topic, message) + res = agent(topic, message, *args) if isinstance(res, type_coro): asyncio.create_task(res) diff --git a/v3/primitives/tests/broker_test.py b/v3/primitives/tests/broker_test.py index 8eb15a0..4b01f0a 100644 --- a/v3/primitives/tests/broker_test.py +++ b/v3/primitives/tests/broker_test.py @@ -39,9 +39,9 @@ async def event_test(): class TestClass: - async def fetch_data(self, topic, message): + async def fetch_data(self, topic, message, arg1, arg2): await asyncio.sleep_ms(100) - print("bound coro", topic, message) + print("bound coro", topic, message, arg1, arg2) def get_data(self, topic, message): print("bound method", topic, message) @@ -53,16 +53,21 @@ async def print_queue(q): print(topic, message) +async def print_ringbuf_q(q): + async for topic, message, args in q: + print(topic, message, args) + + async def main(): tc = TestClass() q = Queue(10) rq = RingbufQueue(10) print("Subscribing Event, coroutine, Queue, RingbufQueue and bound coroutine.") - broker.subscribe("foo_topic", tc.fetch_data) # Bound coroutine + broker.subscribe("foo_topic", tc.fetch_data, 1, 42) # Bound coroutine broker.subscribe("bar_topic", subs) # Coroutine broker.subscribe("bar_topic", event) broker.subscribe("foo_topic", q) - broker.subscribe("bar_topic", rq) + broker.subscribe("bar_topic", rq, "args", "added") asyncio.create_task(test(30)) # Publish to topics for 30s asyncio.create_task(event_test()) @@ -83,7 +88,7 @@ async def main(): broker.unsubscribe("bar_topic", func) print() print("Unsubscribing bound coroutine") - broker.unsubscribe("foo_topic", tc.fetch_data) # Async method + broker.unsubscribe("foo_topic", tc.fetch_data, 1, 42) # Async method print() print("Subscribing method") broker.subscribe("foo_topic", tc.get_data) # Sync method @@ -91,16 +96,21 @@ async def main(): print() print("Unsubscribing method") broker.unsubscribe("foo_topic", tc.get_data) # Async method - print("Pause 5s") - await asyncio.sleep(5) + # print("Pause 5s") + # await asyncio.sleep(5) print("Retrieving foo_topic messages from Queue") try: await asyncio.wait_for(print_queue(q), 5) except asyncio.TimeoutError: print("Timeout") print("Retrieving bar_topic messages from RingbufQueue") - async for topic, message in rq: - print(topic, message) + try: + await asyncio.wait_for(print_ringbuf_q(rq), 5) + except asyncio.TimeoutError: + print("Timeout") + print("Check error on invalid unsubscribe") + broker.unsubscribe("rats", "more rats") # Invalid topic + broker.unsubscribe("foo_topic", "rats") # Invalid agent asyncio.run(main()) From dac5b8830818d545dc9db6ce3584c5fe3a4e68d1 Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Mon, 2 Dec 2024 16:53:33 +0000 Subject: [PATCH 19/28] broker.py: Validate subscriptions. --- v3/docs/DRIVERS.md | 51 ++++++++++++++++-------------- v3/primitives/broker.py | 14 +++++++- v3/primitives/tests/broker_test.py | 15 +++++++-- 3 files changed, 53 insertions(+), 27 deletions(-) diff --git a/v3/docs/DRIVERS.md b/v3/docs/DRIVERS.md index dd7839f..60d78d7 100644 --- a/v3/docs/DRIVERS.md +++ b/v3/docs/DRIVERS.md @@ -1173,14 +1173,10 @@ The `topic` arg is typically a string but may be any hashable object. A #### Agent types -An `agent` may be any of the following: - -* `Queue` When a message is received it receives 2-tuple `(topic, message)`. If -extra args were passed on subscription the queue receives a 3-tuple. -`(topic, message, (args...))`. -* `RingbufQueue` When a message is received it receives 2-tuple `(topic, message)`. -If extra args were passed on subscription it receives a 3-tuple, -`(topic, message, (args...))`. +An `agent` may be an instance of any of the following: + +* `RingbufQueue` Received messages are queued as a 2-tuple `(topic, message)`. +* `Queue` Received messages are queued as a 2-tuple `(topic, message)`. * `function` Called when a message is received. Args: topic, message plus any further args. * `bound method` Called when a message is received. Args: topic, message plus any @@ -1193,7 +1189,8 @@ message plus any further args. * `Event` Set when a message is received. Note that synchronous `agent` instances must run to completion quickly otherwise -the `publish` method will be slowed. +the `publish` method will be slowed. See [Notes](./DRIVERS.md#93-notes) for +further details on queue behaviour. #### Broker class variable @@ -1202,24 +1199,25 @@ the `publish` method will be slowed. #### example ```py import asyncio -from primitives import Broker, Queue +from primitives import Broker, RingbufQueue broker = Broker() -queue = Queue() # Or (e.g. RingbufQueue(20)) +queue = RingbufQueue(20) async def sender(t): for x in range(t): await asyncio.sleep(1) broker.publish("foo_topic", f"test {x}") +async def receiver(): + async for topic, message in queue: + print(topic, message) + async def main(): broker.subscribe("foo_topic", queue) - n = 10 - asyncio.create_task(sender(n)) - print("Letting queue part-fill") - await asyncio.sleep(5) - for _ in range(n): - topic, message = await queue.get() - print(topic, message) + rx = asyncio.create_task(receiver()) + await sender(10) + await asyncio.sleep(2) + rx.cancel() asyncio.run(main()) ``` @@ -1236,7 +1234,8 @@ async def messages(client): Assuming the MQTT client is subscribed to multiple topics, message strings are directed to individual tasks each supporting one topic. -The following illustrates a use case for `agent` args. +The following illustrates a use case for passing args to an `agent` (pin nos. +are for Pyoard 1.1). ```py import asyncio from primitives import Broker @@ -1319,7 +1318,12 @@ If a message causes a queue to fill, a message will silently be lost. It is the responsibility of the subscriber to avoid this. In the case of a `Queue` instance the lost message is the one causing the overflow. In the case of `RingbufQueue` the oldest message in the queue is discarded. In some -applications this behaviour is preferable. +applications this behaviour is preferable. In general `RingbufQueue` is +preferred as it is optimised for microcontroller use and supports retrieval by +an asynchronous iterator. + +If either queue type is subscribed with args, publications will queued as a +3-tuple `(topic, message, (args...))`. There is no obvious use case for this. #### exceptions @@ -1327,9 +1331,10 @@ An instance of an `agent` objects is owned by a subscribing tasks but is executed by a publishing task. If a function used as an `agent` throws an exception, the traceback will point to a `Broker.publish` call. -The `Broker` class does not throw exceptions. There are a number of non-fatal -conditions which can occur such as a queue overflow or an attempt to unsubscribe -an `agent` twice. The `Broker` will report these if `Broker.Verboase=True`. +The `Broker` class throws a `ValueError` if `.subscribe` is called with an +invalid `agent` type. There are a number of non-fatal conditions which can occur +such as a queue overflow or an attempt to unsubscribe an `agent` twice. The +`Broker` will report these if `Broker.Verbose=True`. ###### [Contents](./DRIVERS.md#0-contents) diff --git a/v3/primitives/broker.py b/v3/primitives/broker.py index de88c58..c220c85 100644 --- a/v3/primitives/broker.py +++ b/v3/primitives/broker.py @@ -14,10 +14,22 @@ class Agent: pass +def _validate(a): + return ( + isinstance(a, asyncio.Event) + or isinstance(a, Queue) + or isinstance(a, RingbufQueue) + or isinstance(a, Agent) + or callable(a) + ) + + class Broker(dict): Verbose = True def subscribe(self, topic, agent, *args): + if not _validate(agent): + raise ValueError("Invalid agent:", agent) aa = (agent, args) if not (t := self.get(topic, False)): self[topic] = {aa} @@ -51,7 +63,7 @@ def publish(self, topic, message): try: agent.put_nowait(t if args else t[:2]) except Exception: # Queue discards current message. RingbufQueue discards oldest - Broker.verbose and print(f"Message lost topic {topic} message {message}") + Broker.Verbose and print(f"Message lost topic {topic} message {message}") continue # agent is function, method, coroutine or bound coroutine res = agent(topic, message, *args) diff --git a/v3/primitives/tests/broker_test.py b/v3/primitives/tests/broker_test.py index 4b01f0a..a2c09a3 100644 --- a/v3/primitives/tests/broker_test.py +++ b/v3/primitives/tests/broker_test.py @@ -59,6 +59,7 @@ async def print_ringbuf_q(q): async def main(): + Broker.Verbose = False # Suppress q full messages tc = TestClass() q = Queue(10) rq = RingbufQueue(10) @@ -96,8 +97,6 @@ async def main(): print() print("Unsubscribing method") broker.unsubscribe("foo_topic", tc.get_data) # Async method - # print("Pause 5s") - # await asyncio.sleep(5) print("Retrieving foo_topic messages from Queue") try: await asyncio.wait_for(print_queue(q), 5) @@ -108,9 +107,19 @@ async def main(): await asyncio.wait_for(print_ringbuf_q(rq), 5) except asyncio.TimeoutError: print("Timeout") - print("Check error on invalid unsubscribe") + print() + print("*** Testing error reports and exception ***") + print() + Broker.Verbose = True + print("*** Check error on invalid unsubscribe ***") broker.unsubscribe("rats", "more rats") # Invalid topic broker.unsubscribe("foo_topic", "rats") # Invalid agent + print("*** Check exception on invalid subscribe ***") + try: + broker.subscribe("foo_topic", "rubbish_agent") + print("Test FAIL") + except ValueError: + print("Test PASS") asyncio.run(main()) From e6f4a33f3587625515e08c7406017e3fc4db9501 Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Sun, 15 Dec 2024 18:25:23 +0000 Subject: [PATCH 20/28] Docs: improve Broker coverage. --- v3/docs/DRIVERS.md | 38 ++++++++++++++++++++------------------ v3/docs/TUTORIAL.md | 26 ++++++++++++++++++++++++-- 2 files changed, 44 insertions(+), 20 deletions(-) diff --git a/v3/docs/DRIVERS.md b/v3/docs/DRIVERS.md index 60d78d7..4511e17 100644 --- a/v3/docs/DRIVERS.md +++ b/v3/docs/DRIVERS.md @@ -1136,8 +1136,6 @@ finally: # 9. Message Broker -This is under development: please check for updates. - ```python from primitives import Broker # broker.py ``` @@ -1148,7 +1146,7 @@ message. This enables one to one, one to many, many to one or many to many messaging. A task subscribes to a topic with an `agent`. This is stored by the broker. When -the broker publishes a message, the `agent` of each task subscribed to its topic +the broker publishes a message, every `agent` subscribed to the message topic will be triggered. In the simplest case the `agent` is a `Queue` instance: the broker puts the topic and message onto the subscriber's queue for retrieval. @@ -1173,18 +1171,20 @@ The `topic` arg is typically a string but may be any hashable object. A #### Agent types -An `agent` may be an instance of any of the following: +An `agent` may be an instance of any of the following types. Args refers to any +arguments passed to the `agent`'s' subscription. -* `RingbufQueue` Received messages are queued as a 2-tuple `(topic, message)`. +* `RingbufQueue` Received messages are queued as a 2-tuple `(topic, message)` +assuming no args. * `Queue` Received messages are queued as a 2-tuple `(topic, message)`. -* `function` Called when a message is received. Args: topic, message plus any -further args. -* `bound method` Called when a message is received. Args: topic, message plus any +* `function` Called when a message is received. Args: `topic`, `message` plus any further args. -* `coroutine` Converted to a `task` when a message is received. Args: topic, -message plus any further args. -* `bound coroutine` Converted to a `task` when a message is received. Args: topic, -message plus any further args. +* `bound method` Called when a message is received. Args: `topic`, `message` +plus any further args. +* `coroutine` Converted to a `task` when a message is received. Args: `topic`, +`message` plus any further args. +* `bound coroutine` Converted to a `task` when a message is received. Args: `topic`, +`message` plus any further args. * `user_agent` Instance of a user class. See user agents below. * `Event` Set when a message is received. @@ -1232,7 +1232,8 @@ async def messages(client): broker.publish(topic.decode(), msg.decode()) ``` Assuming the MQTT client is subscribed to multiple topics, message strings are -directed to individual tasks each supporting one topic. +directed to agents, each dedicated to handling a topic. An `agent` might operate +an interface or queue the message for a running task. The following illustrates a use case for passing args to an `agent` (pin nos. are for Pyoard 1.1). @@ -1322,14 +1323,15 @@ applications this behaviour is preferable. In general `RingbufQueue` is preferred as it is optimised for microcontroller use and supports retrieval by an asynchronous iterator. -If either queue type is subscribed with args, publications will queued as a -3-tuple `(topic, message, (args...))`. There is no obvious use case for this. +If either queue type is subscribed with args, a publication will create a queue +entry that is a 3-tuple `(topic, message, (args...))`. There is no obvious use +case for this. #### exceptions -An instance of an `agent` objects is owned by a subscribing tasks but is -executed by a publishing task. If a function used as an `agent` throws an -exception, the traceback will point to a `Broker.publish` call. +An `agent` instance is owned by a subscribing tasks but is executed by a +publishing task. If a function used as an `agent` throws an exception, the +traceback will point to a `Broker.publish` call. The `Broker` class throws a `ValueError` if `.subscribe` is called with an invalid `agent` type. There are a number of non-fatal conditions which can occur diff --git a/v3/docs/TUTORIAL.md b/v3/docs/TUTORIAL.md index e516419..cd24841 100644 --- a/v3/docs/TUTORIAL.md +++ b/v3/docs/TUTORIAL.md @@ -45,7 +45,8 @@ import uasyncio as asyncio 3.7 [Barrier](./TUTORIAL.md#37-barrier) 3.8 [Delay_ms](./TUTORIAL.md#38-delay_ms-class) Software retriggerable delay. 3.9 [Message](./TUTORIAL.md#39-message) - 3.10 [Synchronising to hardware](./TUTORIAL.md#310-synchronising-to-hardware) + 3.10 [Message broker](./TUTORIAL.md#310-message-broker) A publish-subscribe model of messaging and control. + 3.11 [Synchronising to hardware](./TUTORIAL.md#311-synchronising-to-hardware) Debouncing switches, pushbuttons, ESP32 touchpads and encoder knobs. Taming ADC's. 4. [Designing classes for asyncio](./TUTORIAL.md#4-designing-classes-for-asyncio) 4.1 [Awaitable classes](./TUTORIAL.md#41-awaitable-classes) @@ -589,6 +590,8 @@ following classes which are non-standard, are also in that directory: in a similar (but not identical) way to `gather`. * `Delay_ms` A useful software-retriggerable monostable, akin to a watchdog. Calls a user callback if not cancelled or regularly retriggered. + * `RingbufQueue` a MicroPython-optimised queue. + * `Broker` a means of messaging and control based on a publish/subscribe model. A further set of primitives for synchronising hardware are detailed in [section 3.9](./TUTORIAL.md#39-synchronising-to-hardware). @@ -1280,7 +1283,26 @@ provide an object similar to `Event` with the following differences: It may be found in the `threadsafe` directory and is documented [here](./THREADING.md#32-message). -## 3.10 Synchronising to hardware +## 3.10 Message broker + +A `Broker` is a means of communicating data and/or control within or between +modules. It is typically a single global object, and uses a publish-subscribe +model. A publication comprises a `topic` and a `message`; the latter may be any +Python object. Tasks subscribe to a `topic` via an `agent` object. Whenever a +publication, occurs all `agent` instances currently subscribed to that topic are +triggered. + +An `agent` may be an instance of various types including a function, a coroutine +or a queue. + +A benefit of this approach is that the design of publishing tasks can proceed +independently from that of the subscribers; `agent` instances can be subscribed +and unsubscribed at run time with no effect on the publisher. The publisher +neither knows or cares about the type or number of subscribing `agent`s. + +This is [documented here](https://github.com/peterhinch/micropython-async/blob/master/v3/docs/DRIVERS.md#9-message-broker). + +## 3.11 Synchronising to hardware The following hardware-related classes are documented [here](./DRIVERS.md): * `ESwitch` A debounced switch with an `Event` interface. From 3838e1f74d6c71de6625d2ee2defe6da97b9396c Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Fri, 20 Dec 2024 10:35:30 +0000 Subject: [PATCH 21/28] htu21d: I2C address is constructor arg (iss #130) --- v3/as_drivers/htu21d/htu21d_mc.py | 12 ++++++------ v3/docs/HTU21D.md | 7 ++++--- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/v3/as_drivers/htu21d/htu21d_mc.py b/v3/as_drivers/htu21d/htu21d_mc.py index 5d19ba0..385ec48 100644 --- a/v3/as_drivers/htu21d/htu21d_mc.py +++ b/v3/as_drivers/htu21d/htu21d_mc.py @@ -11,7 +11,6 @@ import asyncio from micropython import const -_ADDRESS = const(0x40) # HTU21D Address _PAUSE_MS = const(60) # HTU21D acquisition delay _READ_USER_REG = const(0xE7) @@ -25,10 +24,11 @@ class HTU21D: START_TEMP_MEASURE = b"\xF3" # Commands START_HUMD_MEASURE = b"\xF5" - def __init__(self, i2c, read_delay=10): + def __init__(self, i2c, read_delay=10, address=0x40): self.i2c = i2c - if _ADDRESS not in self.i2c.scan(): + if address not in self.i2c.scan(): raise OSError("No HTU21D device found.") + self.address = address self.temperature = None self.humidity = None asyncio.create_task(self._run(read_delay)) @@ -46,9 +46,9 @@ def __iter__(self): # Await 1st reading yield from asyncio.sleep(0) async def _get_data(self, cmd, divisor=0x131 << 15, bit=1 << 23): - self.i2c.writeto(_ADDRESS, cmd) # Start reading + self.i2c.writeto(self.address, cmd) # Start reading await asyncio.sleep_ms(_PAUSE_MS) # Wait for device - value = self.i2c.readfrom(_ADDRESS, 3) # Read result, check CRC8 + value = self.i2c.readfrom(self.address, 3) # Read result, check CRC8 data, crc = ustruct.unpack(">HB", value) remainder = (data << 8) | crc while bit > 128: @@ -61,4 +61,4 @@ async def _get_data(self, cmd, divisor=0x131 << 15, bit=1 << 23): return data & 0xFFFC # Clear the status bits def user_register(self): # Read the user register byte (should be 2) - return self.i2c.readfrom_mem(_ADDRESS, _READ_USER_REG, 1)[0] + return self.i2c.readfrom_mem(self.address, _READ_USER_REG, 1)[0] diff --git a/v3/docs/HTU21D.md b/v3/docs/HTU21D.md index daa29f3..07feb14 100644 --- a/v3/docs/HTU21D.md +++ b/v3/docs/HTU21D.md @@ -52,9 +52,10 @@ import as_drivers.htu21d.htu_test This provides a single class `HTU21D`. Constructor. -This takes two args, `i2c` (mandatory) and an optional `read_delay=10`. The -former must be an initialised I2C bus instance. The `read_delay` (secs) -determines how frequently the data values are updated. +This takes the following args +* `i2c` (mandatory) An initialised I2C bus instance. +* `read_delay=10`. The frequency (secs) at which data values are updated. +* `address=0x40` I2C address of the chip. Public bound values 1. `temperature` Latest value in Celcius. From 88a0446c393098abafb324da6d16d69e01ce3b73 Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Sun, 29 Dec 2024 12:38:57 +0000 Subject: [PATCH 22/28] DRIVERS.md: Improvements to Broker docs. --- v3/docs/DRIVERS.md | 70 ++++++++++++++++++++++++++-------- v3/primitives/__init__.py | 4 +- v3/primitives/ringbuf_queue.py | 4 +- 3 files changed, 58 insertions(+), 20 deletions(-) diff --git a/v3/docs/DRIVERS.md b/v3/docs/DRIVERS.md index 4511e17..570ce47 100644 --- a/v3/docs/DRIVERS.md +++ b/v3/docs/DRIVERS.md @@ -1141,11 +1141,10 @@ from primitives import Broker # broker.py ``` The `Broker` class provides a flexible means of messaging between running tasks. It uses a publish-subscribe model (akin to MQTT) whereby the transmitting task -publishes to a topic. Any tasks subscribed to that topic will receive the -message. This enables one to one, one to many, many to one or many to many -messaging. +publishes to a topic. Objects subscribed to that topic will receive the message. +This enables one to one, one to many, many to one or many to many messaging. -A task subscribes to a topic with an `agent`. This is stored by the broker. When +A task subscribes to a topic via an `agent`. This is stored by the broker. When the broker publishes a message, every `agent` subscribed to the message topic will be triggered. In the simplest case the `agent` is a `Queue` instance: the broker puts the topic and message onto the subscriber's queue for retrieval. @@ -1153,10 +1152,15 @@ broker puts the topic and message onto the subscriber's queue for retrieval. More advanced agents can perform actions in response to a message, such as calling a function, launching a `task` or lighting an LED. +Agents may be subscribed and unsubscribed dynamically. The publishing task has +no "knowledge" of the number or type of agents subscribed to a topic. The module +is not threadsafe: `Broker` methods should not be called from a hard ISR or from +another thread. + #### Broker methods -All are synchronous. They are not threadsafe so should not be called from a hard -ISR or from another thread. The constructor has no args. +All are synchronous. +* Constructor This has no args. * `subscribe(topic, agent, *args)` Passed `agent` will be triggered by messages with a matching `topic`. Any additional args will be passed to the `agent` when it is triggered. @@ -1172,21 +1176,21 @@ The `topic` arg is typically a string but may be any hashable object. A #### Agent types An `agent` may be an instance of any of the following types. Args refers to any -arguments passed to the `agent`'s' subscription. +arguments passed to the `agent` on subscription. * `RingbufQueue` Received messages are queued as a 2-tuple `(topic, message)` -assuming no args. -* `Queue` Received messages are queued as a 2-tuple `(topic, message)`. +assuming no subscription args - otheriwse `(topic, message, (args...))`. +* `Queue` Received messages are queued as described above. * `function` Called when a message is received. Args: `topic`, `message` plus any -further args. +further subscription args. * `bound method` Called when a message is received. Args: `topic`, `message` plus any further args. * `coroutine` Converted to a `task` when a message is received. Args: `topic`, -`message` plus any further args. +`message` plus any further subscription args. * `bound coroutine` Converted to a `task` when a message is received. Args: `topic`, -`message` plus any further args. -* `user_agent` Instance of a user class. See user agents below. +`message` plus any further subscription args. * `Event` Set when a message is received. +* `user_agent` Instance of a user class. See user agents below. Note that synchronous `agent` instances must run to completion quickly otherwise the `publish` method will be slowed. See [Notes](./DRIVERS.md#93-notes) for @@ -1202,18 +1206,18 @@ import asyncio from primitives import Broker, RingbufQueue broker = Broker() -queue = RingbufQueue(20) async def sender(t): for x in range(t): await asyncio.sleep(1) broker.publish("foo_topic", f"test {x}") async def receiver(): + queue = RingbufQueue(20) + broker.subscribe("foo_topic", queue) async for topic, message in queue: print(topic, message) async def main(): - broker.subscribe("foo_topic", queue) rx = asyncio.create_task(receiver()) await sender(10) await asyncio.sleep(2) @@ -1266,6 +1270,40 @@ async def main(): asyncio.run(main()) ``` +A task can wait on multiple topics using a `RingbufQueue`: +```python +import asyncio +from primitives import Broker, RingbufQueue + +broker = Broker() + +async def receiver(): + q = RingbufQueue(10) + broker.subscribe("foo_topic", q) + broker.subscribe("bar_topic", q) + async for topic, message in q: + print(f"Received Topic: {topic} Message: {message}") + + +async def sender(t): + for x in range(t): + await asyncio.sleep(1) + broker.publish("foo_topic", f"test {x}") + broker.publish("bar_topic", f"test {x}") + broker.publish("ignore me", f"test {x}") + + +async def main(): + rx = asyncio.create_task(receiver()) + await sender(10) + await asyncio.sleep(2) + rx.cancel() + + +asyncio.run(main()) +``` +here the `receiver` task waits on two topics. The asynchronous iterator returns +messages as they are published. ## 9.2 User agents @@ -1298,7 +1336,7 @@ asyncio.run(main()) #### The publish/subscribe model -As in the real world publication carries no guarantee of reception. If at the +As in the real world, publication carries no guarantee of readership. If at the time of publication there are no tasks with subscribed `agent` instances, the message will silently be lost. diff --git a/v3/primitives/__init__.py b/v3/primitives/__init__.py index a6914f9..2dabb4d 100644 --- a/v3/primitives/__init__.py +++ b/v3/primitives/__init__.py @@ -1,6 +1,6 @@ # __init__.py Common functions for uasyncio primitives -# Copyright (c) 2018-2022 Peter Hinch +# Copyright (c) 2018-2024 Peter Hinch # Released under the MIT License (MIT) - see LICENSE file import asyncio @@ -57,7 +57,7 @@ def _handle_exception(loop, context): "Agent": "broker", } -# Copied from uasyncio.__init__.py +# Copied from asyncio.__init__.py # Lazy loader, effectively does: # global attr # from .mod import attr diff --git a/v3/primitives/ringbuf_queue.py b/v3/primitives/ringbuf_queue.py index d2b6f90..eaf7ad3 100644 --- a/v3/primitives/ringbuf_queue.py +++ b/v3/primitives/ringbuf_queue.py @@ -7,8 +7,8 @@ # Uses pre-allocated ring buffer: can use list or array # Asynchronous iterator allowing consumer to use async for # put_nowait QueueFull exception can be ignored allowing oldest data to be discarded - -# this is not thread safe, however the class as a whole is not TS because of its -# use of Event objects. +# this is not thread safe. Nor is the class as a whole TS because of its use of +# Event objects. import asyncio From 6c59025349fa521ebecd9abf8eac93e2cb105225 Mon Sep 17 00:00:00 2001 From: Jeff Otterson Date: Thu, 2 Jan 2025 16:42:16 -0500 Subject: [PATCH 23/28] set dirty to false sooner to mitigate race condition that can happen during rapid updated --- v3/as_drivers/hd44780/alcd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/v3/as_drivers/hd44780/alcd.py b/v3/as_drivers/hd44780/alcd.py index 2b1c76c..908a322 100644 --- a/v3/as_drivers/hd44780/alcd.py +++ b/v3/as_drivers/hd44780/alcd.py @@ -100,9 +100,9 @@ async def runlcd(self): # Periodically check for changed text and update LCD if for row in range(self.rows): if self.dirty[row]: msg = self[row] + self.dirty[row] = False self.lcd_byte(LCD.LCD_LINES[row], LCD.CMD) for thisbyte in msg: self.lcd_byte(ord(thisbyte), LCD.CHR) await asyncio.sleep_ms(0) # Reshedule ASAP - self.dirty[row] = False await asyncio.sleep_ms(20) # Give other coros a look-in From cd56ee035804295d742d36f0adf7101a37e00be7 Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Fri, 10 Jan 2025 10:19:02 +0000 Subject: [PATCH 24/28] Tutorial: add link to event based programming doc. --- v3/docs/TUTORIAL.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/v3/docs/TUTORIAL.md b/v3/docs/TUTORIAL.md index cd24841..b818d58 100644 --- a/v3/docs/TUTORIAL.md +++ b/v3/docs/TUTORIAL.md @@ -797,7 +797,9 @@ wa = WaitAll((evt1, evt2)).wait() await wa # Both were triggered ``` -Awaiting `WaitAll` or `WaitAny` may be cancelled or subject to a timeout. +Awaiting `WaitAll` or `WaitAny` may be cancelled or subject to a timeout. These +primitives are documented in +[event baed programming](https://github.com/peterhinch/micropython-async/blob/master/v3/docs/EVENTS.md). ###### [Contents](./TUTORIAL.md#contents) From 197c2b5d72cc7633e4b3176eabdeef532ea09ffd Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Sat, 11 Jan 2025 18:08:12 +0000 Subject: [PATCH 25/28] Broker: add wildcard subscriptions. --- v3/docs/DRIVERS.md | 22 ++++++++++++++++++---- v3/primitives/__init__.py | 1 + v3/primitives/broker.py | 17 ++++++++++++++++- v3/primitives/tests/broker_test.py | 30 ++++++++++++++++++------------ 4 files changed, 53 insertions(+), 17 deletions(-) diff --git a/v3/docs/DRIVERS.md b/v3/docs/DRIVERS.md index 570ce47..d1570b8 100644 --- a/v3/docs/DRIVERS.md +++ b/v3/docs/DRIVERS.md @@ -26,11 +26,11 @@ MicroPython's `asyncio` when used in a microcontroller context. 6.1 [Encoder class](./DRIVERS.md#61-encoder-class) 7. [Ringbuf Queue](./DRIVERS.md#7-ringbuf-queue) A MicroPython optimised queue primitive. 8. [Delay_ms class](./DRIVERS.md#8-delay_ms-class) A flexible retriggerable delay with callback or Event interface. - 9. [Message Broker](./DRIVERS.md#9-message-broker) A flexible means of messaging between - tasks. + 9. [Message Broker](./DRIVERS.md#9-message-broker) A flexible means of messaging between tasks. 9.1 [Further examples](./DRIVERS.md#91-further-examples) 9.2 [User agents](./DRIVERS.md#92-user-agents) User defined Agent classes. - 9.3 [Notes](./DRIVERS.md#93-notes) + 9.3 [Wildcard subscriptions](./DRIVERS.md#93-wildcard-subscriptions) + 9.4 [Notes](./DRIVERS.md#9-notes) 10. [Additional functions](./DRIVERS.md#10-additional-functions) 10.1 [launch](./DRIVERS.md#101-launch) Run a coro or callback interchangeably. 10.2 [set_global_exception](./DRIVERS.md#102-set_global_exception) Simplify debugging with a global exception handler. @@ -1332,7 +1332,21 @@ async def main(): asyncio.run(main()) ``` -## 9.3 Notes +## 9.3 Wildcard subscriptions + +In the case of publications whose topics are strings, a single call to +`.subscribe` can subscribe an `agent` to multiple topics. This is by wildcard +matching. By default exact matching is used, however this can be changed to use +regular expressions as in this code fragment: +```py +from primitives import Broker, RegExp +broker.subscribe(RegExp(".*_topic"), some_agent) +``` +In this case `some_agent` would be triggered by publications to `foo_topic` or +`bar_topic` because the string `".*_topic"` matches these by the rules of +regular expressions. + +## 9.4 Notes #### The publish/subscribe model diff --git a/v3/primitives/__init__.py b/v3/primitives/__init__.py index 2dabb4d..fe15c4f 100644 --- a/v3/primitives/__init__.py +++ b/v3/primitives/__init__.py @@ -55,6 +55,7 @@ def _handle_exception(loop, context): "SwArray": "sw_array", "Broker": "broker", "Agent": "broker", + "RegExp": "broker", } # Copied from asyncio.__init__.py diff --git a/v3/primitives/broker.py b/v3/primitives/broker.py index c220c85..4ee9b37 100644 --- a/v3/primitives/broker.py +++ b/v3/primitives/broker.py @@ -8,12 +8,21 @@ import asyncio from primitives import Queue, RingbufQueue, type_coro +import re class Agent: pass +class RegExp: + def __init__(self, re_str): + self.re = re.compile(re_str) + + def matching(self, topic): + return re.match(self.re, topic) is not None + + def _validate(a): return ( isinstance(a, asyncio.Event) @@ -50,7 +59,13 @@ def unsubscribe(self, topic, agent, *args): print(f"Unsubscribe topic {topic} fail: topic not subscribed.") def publish(self, topic, message): - agents = self.get(topic, []) + agents = set() # Agents which are triggered by this topic + if isinstance(topic, str): # Check regexps + # Are any keys RegExp instances? + for regexp in [k for k in self.keys() if isinstance(k, RegExp)]: + if regexp.matching(topic): + agents.update(self[regexp]) # Append matching agents + agents.update(self.get(topic, [])) # Exact match for agent, args in agents: if isinstance(agent, asyncio.Event): agent.set() diff --git a/v3/primitives/tests/broker_test.py b/v3/primitives/tests/broker_test.py index a2c09a3..50fc87c 100644 --- a/v3/primitives/tests/broker_test.py +++ b/v3/primitives/tests/broker_test.py @@ -3,7 +3,7 @@ # import primitives.tests.broker_test import asyncio -from primitives import Broker, Queue, RingbufQueue +from primitives import Broker, Queue, RingbufQueue, RegExp broker = Broker() @@ -49,12 +49,13 @@ def get_data(self, topic, message): async def print_queue(q): while True: - topic, message = await q.get() + topic, message = await asyncio.wait_for(q.get(), 2) print(topic, message) async def print_ringbuf_q(q): - async for topic, message, args in q: + while True: + topic, message, args = await asyncio.wait_for(q.get(), 2) print(topic, message, args) @@ -98,20 +99,19 @@ async def main(): print("Unsubscribing method") broker.unsubscribe("foo_topic", tc.get_data) # Async method print("Retrieving foo_topic messages from Queue") - try: - await asyncio.wait_for(print_queue(q), 5) - except asyncio.TimeoutError: - print("Timeout") print("Retrieving bar_topic messages from RingbufQueue") - try: - await asyncio.wait_for(print_ringbuf_q(rq), 5) - except asyncio.TimeoutError: - print("Timeout") + await asyncio.gather(print_queue(q), print_ringbuf_q(rq), return_exceptions=True) + # Queues are now empty print() + print("*** Unsubscribing queues ***") + broker.unsubscribe("foo_topic", q) + broker.unsubscribe("bar_topic", rq, "args", "added") + print() + print("*** Testing error reports and exception ***") print() Broker.Verbose = True - print("*** Check error on invalid unsubscribe ***") + print("*** Produce warning messages on invalid unsubscribe ***") broker.unsubscribe("rats", "more rats") # Invalid topic broker.unsubscribe("foo_topic", "rats") # Invalid agent print("*** Check exception on invalid subscribe ***") @@ -120,6 +120,12 @@ async def main(): print("Test FAIL") except ValueError: print("Test PASS") + print() + print("*** Test wildcard subscribe ***") + broker.subscribe(RegExp(".*_topic"), func) + broker.publish("FAIL", func) # No match + asyncio.create_task(test(5)) + await asyncio.sleep(10) asyncio.run(main()) From 8e01d4287b716f1513a33397074ae6e085ecbe4c Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Sun, 30 Mar 2025 12:28:11 +0100 Subject: [PATCH 26/28] DRIVERS.md: Add note re Delay_ms.deinit(). --- v3/docs/DRIVERS.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/v3/docs/DRIVERS.md b/v3/docs/DRIVERS.md index d1570b8..aebb691 100644 --- a/v3/docs/DRIVERS.md +++ b/v3/docs/DRIVERS.md @@ -1070,7 +1070,9 @@ Synchronous methods: the `Task` instance. This allows the `Task` to be cancelled or awaited. 6. `callback` args `func=None`, `args=()`. Allows the callable and its args to be assigned, reassigned or disabled at run time. - 7. `deinit` No args. Cancels the running task. See [Object scope](./TUTORIAL.md#44-object-scope). + 7. `deinit` No args. Cancels the running task. To avoid a memory leak this + should be called before allowing a `Delay_ms` object to go out of scope. See + [Object scope](./TUTORIAL.md#44-object-scope). 8. `clear` No args. Clears the `Event` described in `wait` below. 9. `set` No args. Sets the `Event` described in `wait` below. From 4001e28f402b9567d9e01318ba2e30d28151d6d1 Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Thu, 8 May 2025 18:35:17 +0100 Subject: [PATCH 27/28] broker.py: Provide instance. Pub message defaults to None. --- v3/docs/DRIVERS.md | 40 ++++++++++++++++-------------- v3/primitives/__init__.py | 1 + v3/primitives/broker.py | 9 ++++--- v3/primitives/tests/broker_test.py | 4 +-- 4 files changed, 30 insertions(+), 24 deletions(-) diff --git a/v3/docs/DRIVERS.md b/v3/docs/DRIVERS.md index aebb691..dcaf218 100644 --- a/v3/docs/DRIVERS.md +++ b/v3/docs/DRIVERS.md @@ -1139,14 +1139,15 @@ finally: # 9. Message Broker ```python -from primitives import Broker # broker.py +from primitives import Broker, broker # broker.py ``` The `Broker` class provides a flexible means of messaging between running tasks. It uses a publish-subscribe model (akin to MQTT) whereby the transmitting task publishes to a topic. Objects subscribed to that topic will receive the message. This enables one to one, one to many, many to one or many to many messaging. -A task subscribes to a topic via an `agent`. This is stored by the broker. When +A task subscribes to a topic via an `agent`: this term describes a set of Python +types which may be used in this role. An `agent` is stored by the broker. When the broker publishes a message, every `agent` subscribed to the message topic will be triggered. In the simplest case the `agent` is a `Queue` instance: the broker puts the topic and message onto the subscriber's queue for retrieval. @@ -1159,6 +1160,12 @@ no "knowledge" of the number or type of agents subscribed to a topic. The module is not threadsafe: `Broker` methods should not be called from a hard ISR or from another thread. +A `Broker` instance `broker` is provided. Where multiple modules issue +```python +from primitives import broker +``` +all will see the same instance, facilitating message passing between modules. + #### Broker methods All are synchronous. @@ -1168,12 +1175,17 @@ with a matching `topic`. Any additional args will be passed to the `agent` when it is triggered. * `unsubscribe(topic, agent, *args)` The `agent` will stop being triggered. If args were passed on subscription, the same args must be passed. -* `publish(topic, message)` All `agent` instances subscribed to `topic` will be -triggered, receiving `topic` and `message` plus any further args that were -passed to `subscribe`. +* `publish(topic, message=None)` All `agent` instances subscribed to `topic` +will be triggered, receiving `topic` and `message` plus any further args that +were passed to `subscribe`. The `topic` arg is typically a string but may be any hashable object. A -`message` is an arbitrary Python object. +`message` is an arbitrary Python object. Where string topics are used, wildcard +subscriptions are possible. + +#### Broker class variable + +* `Verbose=True` Enables printing of debug messages. #### Agent types @@ -1198,16 +1210,11 @@ Note that synchronous `agent` instances must run to completion quickly otherwise the `publish` method will be slowed. See [Notes](./DRIVERS.md#93-notes) for further details on queue behaviour. -#### Broker class variable - -* `Verbose=True` Enables printing of debug messages. - #### example ```py import asyncio -from primitives import Broker, RingbufQueue +from primitives import broker, RingbufQueue -broker = Broker() async def sender(t): for x in range(t): await asyncio.sleep(1) @@ -1245,11 +1252,10 @@ The following illustrates a use case for passing args to an `agent` (pin nos. are for Pyoard 1.1). ```py import asyncio -from primitives import Broker +from primitives import broker from machine import Pin red = Pin("A13", Pin.OUT, value=0) # Pin nos. for Pyboard V1.1 green = Pin("A14", Pin.OUT, value=0) -broker = Broker() async def flash(): broker.publish("led", 1) @@ -1275,9 +1281,8 @@ asyncio.run(main()) A task can wait on multiple topics using a `RingbufQueue`: ```python import asyncio -from primitives import Broker, RingbufQueue +from primitives import broker, RingbufQueue -broker = Broker() async def receiver(): q = RingbufQueue(10) @@ -1316,9 +1321,8 @@ should run to completion quickly. ```py import asyncio -from primitives import Broker, Agent +from primitives import broker, Agent -broker = Broker() class MyAgent(Agent): def put(sef, topic, message, arg): print(f"User agent. Topic: {topic} Message: {message} Arg: {arg}") diff --git a/v3/primitives/__init__.py b/v3/primitives/__init__.py index fe15c4f..ceaad77 100644 --- a/v3/primitives/__init__.py +++ b/v3/primitives/__init__.py @@ -54,6 +54,7 @@ def _handle_exception(loop, context): "Keyboard": "sw_array", "SwArray": "sw_array", "Broker": "broker", + "broker": "broker", "Agent": "broker", "RegExp": "broker", } diff --git a/v3/primitives/broker.py b/v3/primitives/broker.py index 4ee9b37..73072bb 100644 --- a/v3/primitives/broker.py +++ b/v3/primitives/broker.py @@ -1,6 +1,6 @@ # broker.py A message broker for MicroPython -# Copyright (c) 2024 Peter Hinch +# Copyright (c) 2024-2025 Peter Hinch # Released under the MIT License (MIT) - see LICENSE file # Inspired by the following @@ -11,7 +11,7 @@ import re -class Agent: +class Agent: # ABC for user agent pass @@ -58,7 +58,7 @@ def unsubscribe(self, topic, agent, *args): elif Broker.Verbose: print(f"Unsubscribe topic {topic} fail: topic not subscribed.") - def publish(self, topic, message): + def publish(self, topic, message=None): agents = set() # Agents which are triggered by this topic if isinstance(topic, str): # Check regexps # Are any keys RegExp instances? @@ -84,3 +84,6 @@ def publish(self, topic, message): res = agent(topic, message, *args) if isinstance(res, type_coro): asyncio.create_task(res) + + +broker = Broker() diff --git a/v3/primitives/tests/broker_test.py b/v3/primitives/tests/broker_test.py index 50fc87c..ad1357e 100644 --- a/v3/primitives/tests/broker_test.py +++ b/v3/primitives/tests/broker_test.py @@ -3,9 +3,7 @@ # import primitives.tests.broker_test import asyncio -from primitives import Broker, Queue, RingbufQueue, RegExp - -broker = Broker() +from primitives import broker, Queue, RingbufQueue, RegExp # Periodically publish messages to two topics async def test(t): From 84c70cbf49ba3b69ffb59fc8f3cb5e0c9f41d4b0 Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Thu, 15 May 2025 10:50:04 +0100 Subject: [PATCH 28/28] INTERRUPTS.md: fix garbled text. --- v3/docs/INTERRUPTS.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/v3/docs/INTERRUPTS.md b/v3/docs/INTERRUPTS.md index 138a2b0..a2c684c 100644 --- a/v3/docs/INTERRUPTS.md +++ b/v3/docs/INTERRUPTS.md @@ -8,7 +8,7 @@ interrupts in `asyncio` applications. Writing an interrupt service routine (ISR) requires care: see the [official docs](https://docs.micropython.org/en/latest/reference/isr_rules.html). There are restrictions (detailed below) on the way an ISR can interface with -`asyncio`. Finally, on many platformasyncioupts are a limited resource. In +`asyncio`. Finally, on many platforms interrupts are a limited resource. In short interrupts are extremely useful but, if a practical alternative exists, it should be seriously considered.