From 977f0ad2fe64cb8f1c719d4655a0ed83a4d0939e Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Sat, 19 Oct 2024 12:31:02 +0100 Subject: [PATCH 01/20] Tutorial: Fix indentation error in ThreadSafe flag example. --- v3/docs/TUTORIAL.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/v3/docs/TUTORIAL.md b/v3/docs/TUTORIAL.md index cda901f..09e0e33 100644 --- a/v3/docs/TUTORIAL.md +++ b/v3/docs/TUTORIAL.md @@ -1108,14 +1108,14 @@ async def foo(tsf): # Periodically set the ThreadSafeFlag await asyncio.sleep(1) tsf.set() - def ready(tsf, poller): - r = (tsf, POLLIN) - poller.register(*r) +def ready(tsf, poller): + r = (tsf, POLLIN) + poller.register(*r) - def is_rdy(): - return r in poller.ipoll(0) + def is_rdy(): + return r in poller.ipoll(0) - return is_rdy + return is_rdy async def test(): tsf = asyncio.ThreadSafeFlag() From 51f0bc0f82e2a04ef345de30162b57fc5b1b1a57 Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Sat, 19 Oct 2024 12:38:18 +0100 Subject: [PATCH 02/20] Tutorial: ThreadSafeFlag add clear method. --- v3/docs/TUTORIAL.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/v3/docs/TUTORIAL.md b/v3/docs/TUTORIAL.md index 09e0e33..c54c866 100644 --- a/v3/docs/TUTORIAL.md +++ b/v3/docs/TUTORIAL.md @@ -1019,8 +1019,9 @@ running in another thread or on another core. It operates in a similar way to * It is self-clearing. * Only one task may wait on the flag. -Synchronous method: +Synchronous methods: * `set` Triggers the flag. Like issuing `set` then `clear` to an `Event`. + * `clear` Unconditionally clear down the flag. Asynchronous method: * `wait` Wait for the flag to be set. If the flag is already set then it From f1fff202516231360251265af2c38702637b0be0 Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Sat, 19 Oct 2024 14:49:29 +0100 Subject: [PATCH 03/20] primitives/encoder.py: Simplify code. --- v3/primitives/encoder.py | 23 ++++++----------------- 1 file changed, 6 insertions(+), 17 deletions(-) diff --git a/v3/primitives/encoder.py b/v3/primitives/encoder.py index 0f43b87..47d4d49 100644 --- a/v3/primitives/encoder.py +++ b/v3/primitives/encoder.py @@ -1,6 +1,6 @@ # encoder.py Asynchronous driver for incremental quadrature encoder. -# Copyright (c) 2021-2023 Peter Hinch +# Copyright (c) 2021-2024 Peter Hinch # Released under the MIT License (MIT) - see LICENSE file # For an explanation of the design please see @@ -17,19 +17,10 @@ # Raul Kompaß (@rkompass) for suggesting a bugfix here # https://forum.micropython.org/viewtopic.php?f=15&t=9929&p=66175#p66156 +# Now uses ThreadSafeFlag.clear() + import asyncio from machine import Pin -from select import poll, POLLIN - - -def ready(tsf, poller): - r = (tsf, POLLIN) - poller.register(*r) - - def is_rdy(): - return r in poller.ipoll(0) - - return is_rdy class Encoder: @@ -58,7 +49,6 @@ def __init__( if ((vmin is not None) and v < vmin) or ((vmax is not None) and v > vmax): raise ValueError("Incompatible args: must have vmin <= v <= vmax") self._tsf = asyncio.ThreadSafeFlag() - self._tsf_ready = ready(self._tsf, poll()) # Create a ready function trig = Pin.IRQ_RISING | Pin.IRQ_FALLING try: xirq = pin_x.irq(trigger=trig, handler=self._x_cb, hard=True) @@ -90,10 +80,9 @@ async def _run(self, vmin, vmax, div, mod, cb, args): plcv = pcv # Previous value after limits applied delay = self.delay while True: - if delay > 0 and self._tsf_ready(): # Ensure ThreadSafeFlag is clear - await self._tsf.wait() - await self._tsf.wait() - await asyncio.sleep_ms(delay) # Wait for motion to stop. + self._tsf.clear() + await self._tsf.wait() # Wait for an edge. A stopped encoder waits here. + await asyncio.sleep_ms(delay) # Optional rate limit for callback/trig. hv = self._v # Sample hardware (atomic read). if hv == pv: # A change happened but was negated before continue # this got scheduled. Nothing to do. From b4becb1362e90629b1c33719a4f961f06854992e Mon Sep 17 00:00:00 2001 From: peterhinch Date: Sat, 19 Oct 2024 18:29:54 +0100 Subject: [PATCH 04/20] Tutorial: Improve section on polling ThreadSafeFlag. --- v3/docs/TUTORIAL.md | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/v3/docs/TUTORIAL.md b/v3/docs/TUTORIAL.md index c54c866..e516419 100644 --- a/v3/docs/TUTORIAL.md +++ b/v3/docs/TUTORIAL.md @@ -1098,7 +1098,9 @@ class which allows multiple tasks to wait on it. ### 3.6.1 Querying a ThreadSafeFlag -The state of a ThreadSafeFlag may be tested as follows: +The `ThreadSafeFlag` class has no equivalent to `Event.is_set`. A synchronous +function which returns the state of a `ThreadSafeFlag` instance may be created +as follows: ```python import asyncio from select import poll, POLLIN @@ -1109,12 +1111,12 @@ async def foo(tsf): # Periodically set the ThreadSafeFlag await asyncio.sleep(1) tsf.set() -def ready(tsf, poller): +def ready(tsf, poller): # Return a function which returns tsf status r = (tsf, POLLIN) poller.register(*r) def is_rdy(): - return r in poller.ipoll(0) + return r in poller.ipoll(0) # Immediate return return is_rdy @@ -1136,9 +1138,12 @@ async def test(): asyncio.run(test()) ``` The `ready` closure returns a nonblocking function which tests the status of a -given flag. In the above example `.wait()` is not called until the flag has been +passed flag. In this example `.wait()` is not called until the flag has been set, consequently `.wait()` returns rapidly. +The `select.poll` mechanism works because `ThreadSafeFlag` is subclassed from +`io.IOBase` and has an `ioctl` method. + ###### [Contents](./TUTORIAL.md#contents) ## 3.7 Barrier From 22a695ea4be888c82bb1608d252440ac749094ce Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Sun, 20 Oct 2024 12:15:36 +0100 Subject: [PATCH 05/20] encoder.py: Fix declaration of __anext__. --- v3/primitives/encoder.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/v3/primitives/encoder.py b/v3/primitives/encoder.py index 47d4d49..ef9b561 100644 --- a/v3/primitives/encoder.py +++ b/v3/primitives/encoder.py @@ -104,7 +104,7 @@ async def _run(self, vmin, vmax, div, mod, cb, args): def __aiter__(self): return self - def __anext__(self): + async def __anext__(self): await self._trig.wait() self._trig.clear() return self._cv From 12f0be66d467d3ce0b392f9e026c230cce15758b Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Wed, 30 Oct 2024 10:33:00 +0000 Subject: [PATCH 06/20] Docs: Add official RingIO class. --- v3/docs/INTERRUPTS.md | 62 +++++++++++++++++++++++++++++++--- v3/docs/THREADING.md | 2 +- v3/primitives/ringbuf_queue.py | 4 ++- 3 files changed, 61 insertions(+), 7 deletions(-) diff --git a/v3/docs/INTERRUPTS.md b/v3/docs/INTERRUPTS.md index 54dcd70..138a2b0 100644 --- a/v3/docs/INTERRUPTS.md +++ b/v3/docs/INTERRUPTS.md @@ -184,17 +184,69 @@ async def process_data(): await tsf.wait() # Process the data here before waiting for the next interrupt ``` +## 3.4 micropython.RingIO + +This is a byte-oriented circular buffer [documented here] +(https://docs.micropython.org/en/latest/library/micropython.html#micropython.RingIO), +which provides an efficient way to return data from an ISR to an `asyncio` task. +It is implemented in C so performance is high, and supports stream I/O. The +following is a usage example: +```py +import asyncio +from machine import Timer +import micropython +micropython.alloc_emergency_exception_buf(100) + +imu = SomeDevice() # Fictional hardware IMU device + +FRAMESIZE = 8 # Count, x, y, z accel +BUFSIZE = 200 # No. of records. Size allows for up to 200ms of asyncio latency. +rio = micropython.RingIO(FRAMESIZE * BUFSIZE + 1) # RingIO requires an extra byte +count = 0x4000 # Bit14 is "Start of frame" marker. Low bits are a frame counter. + +def cb(_): # Timer callback. Runs at 1KHz. + global count # Frame count + imu.get_accel_irq() # Trigger the device + rio.write(chr(count >> 8)) + rio.write(chr(count & 0xff)) + rio.write(imu.accel.ix) # Device returns bytes objects (length 2) + rio.write(imu.accel.iy) + rio.write(imu.accel.iz) + count += 1 + +async def main(nrecs): + t = Timer(freq=1_000, callback=cb) + sreader = asyncio.StreamReader(rio) + rpb = 100 # Records per block + blocksize = FRAMESIZE * rpb + with open('/sd/imudata', 'wb') as f: + swriter = asyncio.StreamWriter(f, {}) + while nrecs: + data = await sreader.readexactly(blocksize) + swriter.write(data) + await swriter.drain() + nrecs -= rpb + t.deinit() + +asyncio.run(main(1_000)) +``` +In this example data is acquired at a timer-controlled rate of 1KHz, with eight +bytes being written to the `RingIO` every tick. The `main()` task reads the data +stream and writes it out to a file. Similar code was tested on a Pyboard 1.1. -## 3.4 Thread Safe Classes +## 3.5 Other Thread Safe Classes Other classes capable of being used to interface an ISR with `asyncio` are discussed [here](https://github.com/peterhinch/micropython-async/blob/master/v3/docs/THREADING.md), -notably the `ThreadSafeQueue`. +notably the `ThreadSafeQueue`. This ring buffer allows entries to be objects +other than bytes. It supports the asynchronous iterator protocol (rather than +stream I/O) and is written in Python. # 4. Conclusion -The key take-away is that `ThreadSafeFlag` is the only official `asyncio` -construct which can safely be used in an ISR context. Unofficial "thread -safe" classes may also be used. +The `ThreadSafeFlag` and `RingIO` classes are the official `asyncio` constructs +which can safely be used in an ISR context. Unofficial "thread safe" classes may +also be used. Beware of classes such as `Queue` and `RingbufQueue` which are not +thread safe. ###### [Main tutorial](./TUTORIAL.md#contents) diff --git a/v3/docs/THREADING.md b/v3/docs/THREADING.md index f13c343..04c6b34 100644 --- a/v3/docs/THREADING.md +++ b/v3/docs/THREADING.md @@ -340,7 +340,7 @@ instances are required. Attributes of `ThreadSafeQueue`: 1. It is of fixed capacity defined on instantiation. 2. It uses a pre-allocated buffer of user selectable type (`Queue` uses a - dynaically allocated `list`). + dynamically allocated `list`). 3. It is an asynchronous iterator allowing retrieval with `async for`. 4. It provides synchronous "put" and "get" methods. If the queue becomes full (put) or empty (get), behaviour is user definable. The method either blocks or diff --git a/v3/primitives/ringbuf_queue.py b/v3/primitives/ringbuf_queue.py index 65366d3..d2b6f90 100644 --- a/v3/primitives/ringbuf_queue.py +++ b/v3/primitives/ringbuf_queue.py @@ -6,7 +6,9 @@ # API differs from CPython # Uses pre-allocated ring buffer: can use list or array # Asynchronous iterator allowing consumer to use async for -# put_nowait QueueFull exception can be ignored allowing oldest data to be discarded. +# put_nowait QueueFull exception can be ignored allowing oldest data to be discarded - +# this is not thread safe, however the class as a whole is not TS because of its +# use of Event objects. import asyncio From d059887196bc3c87e1edaf1df5b21885ee127a98 Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Fri, 29 Nov 2024 18:12:33 +0000 Subject: [PATCH 07/20] Primitives: Add Broker class. --- v3/docs/DRIVERS.md | 125 +++++++++++++++++++++++++++-- v3/primitives/__init__.py | 2 + v3/primitives/broker.py | 52 ++++++++++++ v3/primitives/package.json | 1 + v3/primitives/tests/broker_test.py | 101 +++++++++++++++++++++++ 5 files changed, 274 insertions(+), 7 deletions(-) create mode 100644 v3/primitives/broker.py create mode 100644 v3/primitives/tests/broker_test.py diff --git a/v3/docs/DRIVERS.md b/v3/docs/DRIVERS.md index 75a3ce6..6117cc4 100644 --- a/v3/docs/DRIVERS.md +++ b/v3/docs/DRIVERS.md @@ -26,11 +26,15 @@ MicroPython's `asyncio` when used in a microcontroller context. 6.1 [Encoder class](./DRIVERS.md#61-encoder-class) 7. [Ringbuf Queue](./DRIVERS.md#7-ringbuf-queue) A MicroPython optimised queue primitive. 8. [Delay_ms class](./DRIVERS.md#8-delay_ms-class) A flexible retriggerable delay with callback or Event interface. - 9. [Additional functions](./DRIVERS.md#9-additional-functions) - 9.1 [launch](./DRIVERS.md#91-launch) Run a coro or callback interchangeably. - 9.2 [set_global_exception](./DRIVERS.md#92-set_global_exception) Simplify debugging with a global exception handler. + 9. [Message Broker](./DRIVERS.md#9-message-broker) A flexible means of messaging between + tasks. + 9.1 [Further examples](./DRIVERS.md#91-further-examples) + 9.2 [User agents](./DRIVERS.md#92-user-agents) + 10. [Additional functions](./DRIVERS.md#10-additional-functions) + 10.1 [launch](./DRIVERS.md#101-launch) Run a coro or callback interchangeably. + 10.2 [set_global_exception](./DRIVERS.md#102-set_global_exception) Simplify debugging with a global exception handler. -###### [Tutorial](./TUTORIAL.md#contents) +###### [asyncio Tutorial](./TUTORIAL.md#contents) # 1. Introduction @@ -1126,9 +1130,116 @@ finally: ``` ###### [Contents](./DRIVERS.md#0-contents) -# 9. Additional functions +# 9. Message Broker + +This is under development: please check for updates. + +The `Broker` class provides a flexible means of messaging between running tasks. +It uses a publish-subscribe model (akin to MQTT) whereby the transmitting task +publishes to a topic. Any tasks subscribed to that topic will receive the +message. This enables one to one, one to many or many to many messaging. + +A task subscribes to a topic with an `agent`. This is stored by the broker. When +the broker publishes a message, the `agent` of each task subscribed to its topic +will be triggered. In the simplest case the `agent` is a `Queue` instance: the +broker puts the topic and message onto the subscriber's queue for retrieval. + +More advanced agents can perform actions in response to a message, such as +calling a function or launching a `task`. + +Broker methods. All are synchronous, constructor has no args: +* `subscribe(topic, agent)` Passed `agent` will be triggered by messages with a +matching `topic`. +* `unsubscribe(topic, agent)` The `agent` will stop being triggered. +* `publish(topic, message)` All `agent` instances subscribed to `topic` will be +triggered, receiving `topic` and `message` args. Returns `True` unless a `Queue` +agent has become full, in which case data for that queue has been lost. + +The `topic` arg is typically a string but may be any hashable object. A +`message` is an arbitrary Python object. An `agent` may be any of the following: +* `Queue` When a message is received receives 2-tuple `(topic, message)`. +* `function` Called when a message is received. Gets 2 args, topic and message. +* `bound method` Called when a message is received. Gets 2 args, topic and +message. +* `coroutine` Task created when a message is received with 2 args, topic and +message. +* `bound coroutine` Task created when a message is received with 2 args, topic +and message. +* Instance of a user class. See user agents below. +* `Event` Set when a message is received. + +Note that synchronous `agent` instances must run to completion quickly otherwise +the `publish` method will be slowed. + +The following is a simple example: +```py +import asyncio +from primitives import Broker, Queue + +broker = Broker() +queue = Queue() +async def sender(t): + for x in range(t): + await asyncio.sleep(1) + broker.publish("foo_topic", f"test {x}") + +async def main(): + broker.subscribe("foo_topic", queue) + n = 10 + asyncio.create_task(sender(n)) + print("Letting queue part-fill") + await asyncio.sleep(5) + for _ in range(n): + topic, message = await queue.get() + print(topic, message) + +asyncio.run(main()) +``` +## 9.1 Further examples + +An interesting application is to extend MQTT into the Python code +(see [mqtt_as](https://github.com/peterhinch/micropython-mqtt/tree/master)). +This is as simple as: +```py +async def messages(client): + async for topic, msg, retained in client.queue: + broker.publish(topic.decode(), msg.decode()) +``` +Assuming the MQTT client is subscribed to multiple topics, message strings are +directed to individual tasks each supporting one topic. + +## 9.2 User agents + +An `agent` can be an instance of a user class. The class must be a subclass of +`Agent`, and it must support a synchronous `.put` method. The latter takes two +args, being `topic` and `message`. It should run to completion quickly. + +```py +import asyncio +from primitives import Broker, Agent + +broker = Broker() +class MyAgent(Agent): + def put(sef, topic, message): + print(f"User agent. Topic: {topic} Message: {message}") + +async def sender(t): + for x in range(t): + await asyncio.sleep(1) + broker.publish("foo_topic", f"test {x}") + +async def main(): + broker.subscribe("foo_topic", MyAgent()) + await sender(10) + +asyncio.run(main()) +``` + +###### [Contents](./DRIVERS.md#0-contents) + +# 10. Additional functions -## 9.1 Launch +## 10.1 Launch Import as follows: ```python @@ -1140,7 +1251,7 @@ runs it and returns the callback's return value. If a coro is passed, it is converted to a `task` and run asynchronously. The return value is the `task` instance. A usage example is in `primitives/switch.py`. -## 9.2 set_global_exception +## 10.2 set_global_exception Import as follows: ```python diff --git a/v3/primitives/__init__.py b/v3/primitives/__init__.py index 0431e3c..a6914f9 100644 --- a/v3/primitives/__init__.py +++ b/v3/primitives/__init__.py @@ -53,6 +53,8 @@ def _handle_exception(loop, context): "RingbufQueue": "ringbuf_queue", "Keyboard": "sw_array", "SwArray": "sw_array", + "Broker": "broker", + "Agent": "broker", } # Copied from uasyncio.__init__.py diff --git a/v3/primitives/broker.py b/v3/primitives/broker.py new file mode 100644 index 0000000..3a0ed20 --- /dev/null +++ b/v3/primitives/broker.py @@ -0,0 +1,52 @@ +# broker.py A message broker for MicroPython + +# Copyright (c) 2024 Peter Hinch +# Released under the MIT License (MIT) - see LICENSE file + +# Inspired by the following +# https://www.joeltok.com/posts/2021-03-building-an-event-bus-in-python/ + +import asyncio +from primitives import Queue, type_coro + + +class Agent: + pass + + +class Broker(dict): + def subscribe(self, topic, agent): + if not self.get(topic, False): + self[topic] = {agent} + else: + self[topic].add(agent) + + def unsubscribe(self, topic, agent): + try: + self[topic].remove(agent) + if len(self[topic]) == 0: + del self[topic] + except KeyError: + pass # Topic already removed + + def publish(self, topic, message): + agents = self.get(topic, []) + result = True + for agent in agents: + if isinstance(agent, asyncio.Event): + agent.set() + continue + if isinstance(agent, Agent): # User class + agent.put(topic, message) # Must support .put + continue + if isinstance(agent, Queue): + if agent.full(): + result = False + else: + agent.put_nowait((topic, message)) + continue + # agent is function, method, coroutine or bound coroutine + res = agent(topic, message) + if isinstance(res, type_coro): + asyncio.create_task(res) + return result diff --git a/v3/primitives/package.json b/v3/primitives/package.json index 8f7e7a7..2adf5cf 100644 --- a/v3/primitives/package.json +++ b/v3/primitives/package.json @@ -3,6 +3,7 @@ ["primitives/__init__.py", "github:peterhinch/micropython-async/v3/primitives/__init__.py"], ["primitives/aadc.py", "github:peterhinch/micropython-async/v3/primitives/aadc.py"], ["primitives/barrier.py", "github:peterhinch/micropython-async/v3/primitives/barrier.py"], + ["primitives/broker.py", "github:peterhinch/micropython-async/v3/primitives/broker.py"], ["primitives/condition.py", "github:peterhinch/micropython-async/v3/primitives/condition.py"], ["primitives/delay_ms.py", "github:peterhinch/micropython-async/v3/primitives/delay_ms.py"], ["primitives/encoder.py", "github:peterhinch/micropython-async/v3/primitives/encoder.py"], diff --git a/v3/primitives/tests/broker_test.py b/v3/primitives/tests/broker_test.py new file mode 100644 index 0000000..cbc70d2 --- /dev/null +++ b/v3/primitives/tests/broker_test.py @@ -0,0 +1,101 @@ +# broker_test.py Test various types of subscriber + +# import primitives.tests.broker_test + +import asyncio +from primitives import Broker, Queue + +broker = Broker() + +# Periodically publish messages to two topics +async def test(t): + for x in range(t): + await asyncio.sleep(1) + broker.publish("foo_topic", f"dogs {x}") + broker.publish("bar_topic", f"rats {x}") + + +# Suscribe via coroutine +async def subs(topic, message): + await asyncio.sleep_ms(100) + print("coroutine", topic, message) + + +# Subscribe via function +def func(topic, message): + print("function", topic, message) + + +# Subscribe via Event + +event = asyncio.Event() + + +async def event_test(): + while True: + await event.wait() + event.clear() + print("Event triggered") + + +class TestClass: + async def fetch_data(self, topic, message): + await asyncio.sleep_ms(100) + print("bound coro", topic, message) + + def get_data(self, topic, message): + print("bound method", topic, message) + + +async def print_queue(q): + while True: + topic, message = await q.get() + print(topic, message) + + +async def main(): + tc = TestClass() + q = Queue(10) + print("Subscribing Event, coroutine, Queue and bound coroutine.") + broker.subscribe("foo_topic", tc.fetch_data) # Bound coroutine + broker.subscribe("bar_topic", subs) # Coroutine + broker.subscribe("bar_topic", event) + broker.subscribe("foo_topic", q) + + asyncio.create_task(test(30)) # Publish to topics for 30s + asyncio.create_task(event_test()) + await asyncio.sleep(5) + print() + print("Unsubscribing coroutine") + broker.unsubscribe("bar_topic", subs) + await asyncio.sleep(5) + print() + print("Unsubscribing Event") + broker.unsubscribe("bar_topic", event) + print() + print("Subscribing function") + broker.subscribe("bar_topic", func) + await asyncio.sleep(5) + print() + print("Unsubscribing function") + broker.unsubscribe("bar_topic", func) + print() + print("Unsubscribing bound coroutine") + broker.unsubscribe("foo_topic", tc.fetch_data) # Async method + print() + print("Subscribing method") + broker.subscribe("foo_topic", tc.get_data) # Sync method + await asyncio.sleep(5) + print() + print("Unsubscribing method") + broker.unsubscribe("foo_topic", tc.get_data) # Async method + print("Pause 5s") + await asyncio.sleep(5) + print("Retrieving foo_topic messages from queue") + try: + await asyncio.wait_for(print_queue(q), 5) + except asyncio.TimeoutError: + print("Done") + + +asyncio.run(main()) From 7d0b3d3e48be89720ad2743a8039965f281f059c Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Sat, 30 Nov 2024 12:43:47 +0000 Subject: [PATCH 08/20] DRIVERS.md: Broker - clarify Queue becoming full. --- v3/docs/DRIVERS.md | 6 ++++-- v3/docs/PRIMITIVES.md | 1 + 2 files changed, 5 insertions(+), 2 deletions(-) create mode 100644 v3/docs/PRIMITIVES.md diff --git a/v3/docs/DRIVERS.md b/v3/docs/DRIVERS.md index 6117cc4..95218b5 100644 --- a/v3/docs/DRIVERS.md +++ b/v3/docs/DRIVERS.md @@ -1132,7 +1132,8 @@ finally: # 9. Message Broker -This is under development: please check for updates. +This is under development: please check for updates. See +[code](https://github.com/peterhinch/micropython-async/blob/master/v3/primitives/broker.py). The `Broker` class provides a flexible means of messaging between running tasks. It uses a publish-subscribe model (akin to MQTT) whereby the transmitting task @@ -1153,7 +1154,8 @@ matching `topic`. * `unsubscribe(topic, agent)` The `agent` will stop being triggered. * `publish(topic, message)` All `agent` instances subscribed to `topic` will be triggered, receiving `topic` and `message` args. Returns `True` unless a `Queue` -agent has become full, in which case data for that queue has been lost. +agent has become full. A `False` value indicates that at least one message has +been lost. The `topic` arg is typically a string but may be any hashable object. A `message` is an arbitrary Python object. An `agent` may be any of the following: diff --git a/v3/docs/PRIMITIVES.md b/v3/docs/PRIMITIVES.md new file mode 100644 index 0000000..30e6e97 --- /dev/null +++ b/v3/docs/PRIMITIVES.md @@ -0,0 +1 @@ +### For historical reasons documentation for primitives may be found [here](./DRIVERS.md). From 709663d632bfe3b378b03607eb30e9d4ad68cfdd Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Sun, 1 Dec 2024 11:12:02 +0000 Subject: [PATCH 09/20] broker: Prior to adding args. --- v3/docs/DRIVERS.md | 12 +++++++----- v3/primitives/broker.py | 12 +++++------- v3/primitives/tests/broker_test.py | 13 +++++++++---- 3 files changed, 21 insertions(+), 16 deletions(-) diff --git a/v3/docs/DRIVERS.md b/v3/docs/DRIVERS.md index 95218b5..9ee3900 100644 --- a/v3/docs/DRIVERS.md +++ b/v3/docs/DRIVERS.md @@ -1134,7 +1134,9 @@ finally: This is under development: please check for updates. See [code](https://github.com/peterhinch/micropython-async/blob/master/v3/primitives/broker.py). - +```py +from primitives import Broker +``` The `Broker` class provides a flexible means of messaging between running tasks. It uses a publish-subscribe model (akin to MQTT) whereby the transmitting task publishes to a topic. Any tasks subscribed to that topic will receive the @@ -1153,13 +1155,13 @@ Broker methods. All are synchronous, constructor has no args: matching `topic`. * `unsubscribe(topic, agent)` The `agent` will stop being triggered. * `publish(topic, message)` All `agent` instances subscribed to `topic` will be -triggered, receiving `topic` and `message` args. Returns `True` unless a `Queue` -agent has become full. A `False` value indicates that at least one message has -been lost. +triggered, receiving `topic` and `message` args. The method is not threadsafe; +it should not be called from a hard ISR or from another thread. The `topic` arg is typically a string but may be any hashable object. A `message` is an arbitrary Python object. An `agent` may be any of the following: * `Queue` When a message is received receives 2-tuple `(topic, message)`. +* `RingbufQueue` When a message is received receives 2-tuple `(topic, message)`. * `function` Called when a message is received. Gets 2 args, topic and message. * `bound method` Called when a message is received. Gets 2 args, topic and message. @@ -1179,7 +1181,7 @@ import asyncio from primitives import Broker, Queue broker = Broker() -queue = Queue() +queue = Queue() # Or (e.g. RingbufQueue(20)) async def sender(t): for x in range(t): await asyncio.sleep(1) diff --git a/v3/primitives/broker.py b/v3/primitives/broker.py index 3a0ed20..35989c0 100644 --- a/v3/primitives/broker.py +++ b/v3/primitives/broker.py @@ -7,7 +7,7 @@ # https://www.joeltok.com/posts/2021-03-building-an-event-bus-in-python/ import asyncio -from primitives import Queue, type_coro +from primitives import Queue, RingbufQueue, type_coro class Agent: @@ -31,7 +31,6 @@ def unsubscribe(self, topic, agent): def publish(self, topic, message): agents = self.get(topic, []) - result = True for agent in agents: if isinstance(agent, asyncio.Event): agent.set() @@ -39,14 +38,13 @@ def publish(self, topic, message): if isinstance(agent, Agent): # User class agent.put(topic, message) # Must support .put continue - if isinstance(agent, Queue): - if agent.full(): - result = False - else: + if isinstance(agent, Queue) or isinstance(agent, RingbufQueue): + try: agent.put_nowait((topic, message)) + except Exception: # TODO + pass continue # agent is function, method, coroutine or bound coroutine res = agent(topic, message) if isinstance(res, type_coro): asyncio.create_task(res) - return result diff --git a/v3/primitives/tests/broker_test.py b/v3/primitives/tests/broker_test.py index cbc70d2..8eb15a0 100644 --- a/v3/primitives/tests/broker_test.py +++ b/v3/primitives/tests/broker_test.py @@ -3,7 +3,7 @@ # import primitives.tests.broker_test import asyncio -from primitives import Broker, Queue +from primitives import Broker, Queue, RingbufQueue broker = Broker() @@ -56,11 +56,13 @@ async def print_queue(q): async def main(): tc = TestClass() q = Queue(10) - print("Subscribing Event, coroutine, Queue and bound coroutine.") + rq = RingbufQueue(10) + print("Subscribing Event, coroutine, Queue, RingbufQueue and bound coroutine.") broker.subscribe("foo_topic", tc.fetch_data) # Bound coroutine broker.subscribe("bar_topic", subs) # Coroutine broker.subscribe("bar_topic", event) broker.subscribe("foo_topic", q) + broker.subscribe("bar_topic", rq) asyncio.create_task(test(30)) # Publish to topics for 30s asyncio.create_task(event_test()) @@ -91,11 +93,14 @@ async def main(): broker.unsubscribe("foo_topic", tc.get_data) # Async method print("Pause 5s") await asyncio.sleep(5) - print("Retrieving foo_topic messages from queue") + print("Retrieving foo_topic messages from Queue") try: await asyncio.wait_for(print_queue(q), 5) except asyncio.TimeoutError: - print("Done") + print("Timeout") + print("Retrieving bar_topic messages from RingbufQueue") + async for topic, message in rq: + print(topic, message) asyncio.run(main()) From 596e4636f21fe83f206859afadc5f12042c730df Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Sun, 1 Dec 2024 18:23:33 +0000 Subject: [PATCH 10/20] broker.py: Agents can have args. --- v3/docs/DRIVERS.md | 152 +++++++++++++++++++++++------ v3/primitives/broker.py | 41 +++++--- v3/primitives/tests/broker_test.py | 28 ++++-- 3 files changed, 166 insertions(+), 55 deletions(-) diff --git a/v3/docs/DRIVERS.md b/v3/docs/DRIVERS.md index 9ee3900..dd7839f 100644 --- a/v3/docs/DRIVERS.md +++ b/v3/docs/DRIVERS.md @@ -29,7 +29,8 @@ MicroPython's `asyncio` when used in a microcontroller context. 9. [Message Broker](./DRIVERS.md#9-message-broker) A flexible means of messaging between tasks. 9.1 [Further examples](./DRIVERS.md#91-further-examples) - 9.2 [User agents](./DRIVERS.md#92-user-agents) + 9.2 [User agents](./DRIVERS.md#92-user-agents) User defined Agent classes. + 9.3 [Notes](./DRIVERS.md#93-notes) 10. [Additional functions](./DRIVERS.md#10-additional-functions) 10.1 [launch](./DRIVERS.md#101-launch) Run a coro or callback interchangeably. 10.2 [set_global_exception](./DRIVERS.md#102-set_global_exception) Simplify debugging with a global exception handler. @@ -1027,6 +1028,9 @@ def add_item(q, data): # 8. Delay_ms class +```python +from primitives import Delay_ms # delay_ms.py +``` This implements the software equivalent of a retriggerable monostable or a watchdog timer. It has an internal boolean `running` state. When instantiated the `Delay_ms` instance does nothing, with `running` `False` until triggered. @@ -1132,15 +1136,16 @@ finally: # 9. Message Broker -This is under development: please check for updates. See -[code](https://github.com/peterhinch/micropython-async/blob/master/v3/primitives/broker.py). -```py -from primitives import Broker +This is under development: please check for updates. + +```python +from primitives import Broker # broker.py ``` The `Broker` class provides a flexible means of messaging between running tasks. It uses a publish-subscribe model (akin to MQTT) whereby the transmitting task publishes to a topic. Any tasks subscribed to that topic will receive the -message. This enables one to one, one to many or many to many messaging. +message. This enables one to one, one to many, many to one or many to many +messaging. A task subscribes to a topic with an `agent`. This is stored by the broker. When the broker publishes a message, the `agent` of each task subscribed to its topic @@ -1148,34 +1153,53 @@ will be triggered. In the simplest case the `agent` is a `Queue` instance: the broker puts the topic and message onto the subscriber's queue for retrieval. More advanced agents can perform actions in response to a message, such as -calling a function or launching a `task`. +calling a function, launching a `task` or lighting an LED. + +#### Broker methods -Broker methods. All are synchronous, constructor has no args: -* `subscribe(topic, agent)` Passed `agent` will be triggered by messages with a -matching `topic`. -* `unsubscribe(topic, agent)` The `agent` will stop being triggered. +All are synchronous. They are not threadsafe so should not be called from a hard +ISR or from another thread. The constructor has no args. +* `subscribe(topic, agent, *args)` Passed `agent` will be triggered by messages +with a matching `topic`. Any additional args will be passed to the `agent` when +it is triggered. +* `unsubscribe(topic, agent, *args)` The `agent` will stop being triggered. If +args were passed on subscription, the same args must be passed. * `publish(topic, message)` All `agent` instances subscribed to `topic` will be -triggered, receiving `topic` and `message` args. The method is not threadsafe; -it should not be called from a hard ISR or from another thread. +triggered, receiving `topic` and `message` plus any further args that were +passed to `subscribe`. The `topic` arg is typically a string but may be any hashable object. A -`message` is an arbitrary Python object. An `agent` may be any of the following: -* `Queue` When a message is received receives 2-tuple `(topic, message)`. -* `RingbufQueue` When a message is received receives 2-tuple `(topic, message)`. -* `function` Called when a message is received. Gets 2 args, topic and message. -* `bound method` Called when a message is received. Gets 2 args, topic and -message. -* `coroutine` Task created when a message is received with 2 args, topic and -message. -* `bound coroutine` Task created when a message is received with 2 args, topic -and message. -* Instance of a user class. See user agents below. +`message` is an arbitrary Python object. + +#### Agent types + +An `agent` may be any of the following: + +* `Queue` When a message is received it receives 2-tuple `(topic, message)`. If +extra args were passed on subscription the queue receives a 3-tuple. +`(topic, message, (args...))`. +* `RingbufQueue` When a message is received it receives 2-tuple `(topic, message)`. +If extra args were passed on subscription it receives a 3-tuple, +`(topic, message, (args...))`. +* `function` Called when a message is received. Args: topic, message plus any +further args. +* `bound method` Called when a message is received. Args: topic, message plus any +further args. +* `coroutine` Converted to a `task` when a message is received. Args: topic, +message plus any further args. +* `bound coroutine` Converted to a `task` when a message is received. Args: topic, +message plus any further args. +* `user_agent` Instance of a user class. See user agents below. * `Event` Set when a message is received. Note that synchronous `agent` instances must run to completion quickly otherwise the `publish` method will be slowed. -The following is a simple example: +#### Broker class variable + +* `Verbose=True` Enables printing of debug messages. + +#### example ```py import asyncio from primitives import Broker, Queue @@ -1212,11 +1236,43 @@ async def messages(client): Assuming the MQTT client is subscribed to multiple topics, message strings are directed to individual tasks each supporting one topic. +The following illustrates a use case for `agent` args. +```py +import asyncio +from primitives import Broker +from machine import Pin +red = Pin("A13", Pin.OUT, value=0) # Pin nos. for Pyboard V1.1 +green = Pin("A14", Pin.OUT, value=0) +broker = Broker() + +async def flash(): + broker.publish("led", 1) + await asyncio.sleep(1) + broker.publish("led", 0) + +def recv(topic, message, led): + led(message) # Light or extinguish an LED + +async def main(): + broker.subscribe("led", recv, red) + broker.subscribe("led", recv, green) + for _ in range(10): + await flash() + await asyncio.sleep(1) + broker.unsubscribe("led", recv, green) # Arg(s) must be passed + for _ in range(3): + await flash() + await asyncio.sleep(1) + +asyncio.run(main()) +``` + ## 9.2 User agents An `agent` can be an instance of a user class. The class must be a subclass of -`Agent`, and it must support a synchronous `.put` method. The latter takes two -args, being `topic` and `message`. It should run to completion quickly. +`Agent`, and it must support a synchronous `.put` method. Arguments are `topic` +and `message`, followed by any further args passed on subscription. The method +should run to completion quickly. ```py import asyncio @@ -1224,8 +1280,8 @@ from primitives import Broker, Agent broker = Broker() class MyAgent(Agent): - def put(sef, topic, message): - print(f"User agent. Topic: {topic} Message: {message}") + def put(sef, topic, message, arg): + print(f"User agent. Topic: {topic} Message: {message} Arg: {arg}") async def sender(t): for x in range(t): @@ -1233,11 +1289,47 @@ async def sender(t): broker.publish("foo_topic", f"test {x}") async def main(): - broker.subscribe("foo_topic", MyAgent()) + broker.subscribe("foo_topic", MyAgent(), 42) await sender(10) asyncio.run(main()) ``` +## 9.3 Notes + +#### The publish/subscribe model + +As in the real world publication carries no guarantee of reception. If at the +time of publication there are no tasks with subscribed `agent` instances, the +message will silently be lost. + +#### agent arguments + +Arguments must be hashable objects. Mutable objects such as lists and +dictionaries are not permitted. If an object can be added to a `set` it is +valid. In general, interfaces such as `Pin` instances are OK. + +#### agent uniqueness + +An `agent` can be subscribed to multiple `topic`s. An `agent` may be subscribed +to a `topic` multiple times only if each instance has different arguments. + +#### queues + +If a message causes a queue to fill, a message will silently be lost. It is the +responsibility of the subscriber to avoid this. In the case of a `Queue` +instance the lost message is the one causing the overflow. In the case of +`RingbufQueue` the oldest message in the queue is discarded. In some +applications this behaviour is preferable. + +#### exceptions + +An instance of an `agent` objects is owned by a subscribing tasks but is +executed by a publishing task. If a function used as an `agent` throws an +exception, the traceback will point to a `Broker.publish` call. + +The `Broker` class does not throw exceptions. There are a number of non-fatal +conditions which can occur such as a queue overflow or an attempt to unsubscribe +an `agent` twice. The `Broker` will report these if `Broker.Verboase=True`. ###### [Contents](./DRIVERS.md#0-contents) diff --git a/v3/primitives/broker.py b/v3/primitives/broker.py index 35989c0..de88c58 100644 --- a/v3/primitives/broker.py +++ b/v3/primitives/broker.py @@ -15,36 +15,45 @@ class Agent: class Broker(dict): - def subscribe(self, topic, agent): - if not self.get(topic, False): - self[topic] = {agent} - else: - self[topic].add(agent) + Verbose = True - def unsubscribe(self, topic, agent): - try: - self[topic].remove(agent) + def subscribe(self, topic, agent, *args): + aa = (agent, args) + if not (t := self.get(topic, False)): + self[topic] = {aa} + else: + if aa in t and Broker.Verbose: + print(f"Duplicate agent {aa} in topic {topic}.") + t.add(aa) + + def unsubscribe(self, topic, agent, *args): + if topic in self: + if (aa := (agent, args)) in self[topic]: + self[topic].remove(aa) + elif Broker.Verbose: + print(f"Unsubscribe agent {aa} from topic {topic} fail: agent not subscribed.") if len(self[topic]) == 0: del self[topic] - except KeyError: - pass # Topic already removed + elif Broker.Verbose: + print(f"Unsubscribe topic {topic} fail: topic not subscribed.") def publish(self, topic, message): agents = self.get(topic, []) - for agent in agents: + for agent, args in agents: if isinstance(agent, asyncio.Event): agent.set() continue if isinstance(agent, Agent): # User class - agent.put(topic, message) # Must support .put + agent.put(topic, message, *args) # Must support .put continue if isinstance(agent, Queue) or isinstance(agent, RingbufQueue): + t = (topic, message, args) try: - agent.put_nowait((topic, message)) - except Exception: # TODO - pass + agent.put_nowait(t if args else t[:2]) + except Exception: # Queue discards current message. RingbufQueue discards oldest + Broker.verbose and print(f"Message lost topic {topic} message {message}") continue # agent is function, method, coroutine or bound coroutine - res = agent(topic, message) + res = agent(topic, message, *args) if isinstance(res, type_coro): asyncio.create_task(res) diff --git a/v3/primitives/tests/broker_test.py b/v3/primitives/tests/broker_test.py index 8eb15a0..4b01f0a 100644 --- a/v3/primitives/tests/broker_test.py +++ b/v3/primitives/tests/broker_test.py @@ -39,9 +39,9 @@ async def event_test(): class TestClass: - async def fetch_data(self, topic, message): + async def fetch_data(self, topic, message, arg1, arg2): await asyncio.sleep_ms(100) - print("bound coro", topic, message) + print("bound coro", topic, message, arg1, arg2) def get_data(self, topic, message): print("bound method", topic, message) @@ -53,16 +53,21 @@ async def print_queue(q): print(topic, message) +async def print_ringbuf_q(q): + async for topic, message, args in q: + print(topic, message, args) + + async def main(): tc = TestClass() q = Queue(10) rq = RingbufQueue(10) print("Subscribing Event, coroutine, Queue, RingbufQueue and bound coroutine.") - broker.subscribe("foo_topic", tc.fetch_data) # Bound coroutine + broker.subscribe("foo_topic", tc.fetch_data, 1, 42) # Bound coroutine broker.subscribe("bar_topic", subs) # Coroutine broker.subscribe("bar_topic", event) broker.subscribe("foo_topic", q) - broker.subscribe("bar_topic", rq) + broker.subscribe("bar_topic", rq, "args", "added") asyncio.create_task(test(30)) # Publish to topics for 30s asyncio.create_task(event_test()) @@ -83,7 +88,7 @@ async def main(): broker.unsubscribe("bar_topic", func) print() print("Unsubscribing bound coroutine") - broker.unsubscribe("foo_topic", tc.fetch_data) # Async method + broker.unsubscribe("foo_topic", tc.fetch_data, 1, 42) # Async method print() print("Subscribing method") broker.subscribe("foo_topic", tc.get_data) # Sync method @@ -91,16 +96,21 @@ async def main(): print() print("Unsubscribing method") broker.unsubscribe("foo_topic", tc.get_data) # Async method - print("Pause 5s") - await asyncio.sleep(5) + # print("Pause 5s") + # await asyncio.sleep(5) print("Retrieving foo_topic messages from Queue") try: await asyncio.wait_for(print_queue(q), 5) except asyncio.TimeoutError: print("Timeout") print("Retrieving bar_topic messages from RingbufQueue") - async for topic, message in rq: - print(topic, message) + try: + await asyncio.wait_for(print_ringbuf_q(rq), 5) + except asyncio.TimeoutError: + print("Timeout") + print("Check error on invalid unsubscribe") + broker.unsubscribe("rats", "more rats") # Invalid topic + broker.unsubscribe("foo_topic", "rats") # Invalid agent asyncio.run(main()) From dac5b8830818d545dc9db6ce3584c5fe3a4e68d1 Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Mon, 2 Dec 2024 16:53:33 +0000 Subject: [PATCH 11/20] broker.py: Validate subscriptions. --- v3/docs/DRIVERS.md | 51 ++++++++++++++++-------------- v3/primitives/broker.py | 14 +++++++- v3/primitives/tests/broker_test.py | 15 +++++++-- 3 files changed, 53 insertions(+), 27 deletions(-) diff --git a/v3/docs/DRIVERS.md b/v3/docs/DRIVERS.md index dd7839f..60d78d7 100644 --- a/v3/docs/DRIVERS.md +++ b/v3/docs/DRIVERS.md @@ -1173,14 +1173,10 @@ The `topic` arg is typically a string but may be any hashable object. A #### Agent types -An `agent` may be any of the following: - -* `Queue` When a message is received it receives 2-tuple `(topic, message)`. If -extra args were passed on subscription the queue receives a 3-tuple. -`(topic, message, (args...))`. -* `RingbufQueue` When a message is received it receives 2-tuple `(topic, message)`. -If extra args were passed on subscription it receives a 3-tuple, -`(topic, message, (args...))`. +An `agent` may be an instance of any of the following: + +* `RingbufQueue` Received messages are queued as a 2-tuple `(topic, message)`. +* `Queue` Received messages are queued as a 2-tuple `(topic, message)`. * `function` Called when a message is received. Args: topic, message plus any further args. * `bound method` Called when a message is received. Args: topic, message plus any @@ -1193,7 +1189,8 @@ message plus any further args. * `Event` Set when a message is received. Note that synchronous `agent` instances must run to completion quickly otherwise -the `publish` method will be slowed. +the `publish` method will be slowed. See [Notes](./DRIVERS.md#93-notes) for +further details on queue behaviour. #### Broker class variable @@ -1202,24 +1199,25 @@ the `publish` method will be slowed. #### example ```py import asyncio -from primitives import Broker, Queue +from primitives import Broker, RingbufQueue broker = Broker() -queue = Queue() # Or (e.g. RingbufQueue(20)) +queue = RingbufQueue(20) async def sender(t): for x in range(t): await asyncio.sleep(1) broker.publish("foo_topic", f"test {x}") +async def receiver(): + async for topic, message in queue: + print(topic, message) + async def main(): broker.subscribe("foo_topic", queue) - n = 10 - asyncio.create_task(sender(n)) - print("Letting queue part-fill") - await asyncio.sleep(5) - for _ in range(n): - topic, message = await queue.get() - print(topic, message) + rx = asyncio.create_task(receiver()) + await sender(10) + await asyncio.sleep(2) + rx.cancel() asyncio.run(main()) ``` @@ -1236,7 +1234,8 @@ async def messages(client): Assuming the MQTT client is subscribed to multiple topics, message strings are directed to individual tasks each supporting one topic. -The following illustrates a use case for `agent` args. +The following illustrates a use case for passing args to an `agent` (pin nos. +are for Pyoard 1.1). ```py import asyncio from primitives import Broker @@ -1319,7 +1318,12 @@ If a message causes a queue to fill, a message will silently be lost. It is the responsibility of the subscriber to avoid this. In the case of a `Queue` instance the lost message is the one causing the overflow. In the case of `RingbufQueue` the oldest message in the queue is discarded. In some -applications this behaviour is preferable. +applications this behaviour is preferable. In general `RingbufQueue` is +preferred as it is optimised for microcontroller use and supports retrieval by +an asynchronous iterator. + +If either queue type is subscribed with args, publications will queued as a +3-tuple `(topic, message, (args...))`. There is no obvious use case for this. #### exceptions @@ -1327,9 +1331,10 @@ An instance of an `agent` objects is owned by a subscribing tasks but is executed by a publishing task. If a function used as an `agent` throws an exception, the traceback will point to a `Broker.publish` call. -The `Broker` class does not throw exceptions. There are a number of non-fatal -conditions which can occur such as a queue overflow or an attempt to unsubscribe -an `agent` twice. The `Broker` will report these if `Broker.Verboase=True`. +The `Broker` class throws a `ValueError` if `.subscribe` is called with an +invalid `agent` type. There are a number of non-fatal conditions which can occur +such as a queue overflow or an attempt to unsubscribe an `agent` twice. The +`Broker` will report these if `Broker.Verbose=True`. ###### [Contents](./DRIVERS.md#0-contents) diff --git a/v3/primitives/broker.py b/v3/primitives/broker.py index de88c58..c220c85 100644 --- a/v3/primitives/broker.py +++ b/v3/primitives/broker.py @@ -14,10 +14,22 @@ class Agent: pass +def _validate(a): + return ( + isinstance(a, asyncio.Event) + or isinstance(a, Queue) + or isinstance(a, RingbufQueue) + or isinstance(a, Agent) + or callable(a) + ) + + class Broker(dict): Verbose = True def subscribe(self, topic, agent, *args): + if not _validate(agent): + raise ValueError("Invalid agent:", agent) aa = (agent, args) if not (t := self.get(topic, False)): self[topic] = {aa} @@ -51,7 +63,7 @@ def publish(self, topic, message): try: agent.put_nowait(t if args else t[:2]) except Exception: # Queue discards current message. RingbufQueue discards oldest - Broker.verbose and print(f"Message lost topic {topic} message {message}") + Broker.Verbose and print(f"Message lost topic {topic} message {message}") continue # agent is function, method, coroutine or bound coroutine res = agent(topic, message, *args) diff --git a/v3/primitives/tests/broker_test.py b/v3/primitives/tests/broker_test.py index 4b01f0a..a2c09a3 100644 --- a/v3/primitives/tests/broker_test.py +++ b/v3/primitives/tests/broker_test.py @@ -59,6 +59,7 @@ async def print_ringbuf_q(q): async def main(): + Broker.Verbose = False # Suppress q full messages tc = TestClass() q = Queue(10) rq = RingbufQueue(10) @@ -96,8 +97,6 @@ async def main(): print() print("Unsubscribing method") broker.unsubscribe("foo_topic", tc.get_data) # Async method - # print("Pause 5s") - # await asyncio.sleep(5) print("Retrieving foo_topic messages from Queue") try: await asyncio.wait_for(print_queue(q), 5) @@ -108,9 +107,19 @@ async def main(): await asyncio.wait_for(print_ringbuf_q(rq), 5) except asyncio.TimeoutError: print("Timeout") - print("Check error on invalid unsubscribe") + print() + print("*** Testing error reports and exception ***") + print() + Broker.Verbose = True + print("*** Check error on invalid unsubscribe ***") broker.unsubscribe("rats", "more rats") # Invalid topic broker.unsubscribe("foo_topic", "rats") # Invalid agent + print("*** Check exception on invalid subscribe ***") + try: + broker.subscribe("foo_topic", "rubbish_agent") + print("Test FAIL") + except ValueError: + print("Test PASS") asyncio.run(main()) From e6f4a33f3587625515e08c7406017e3fc4db9501 Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Sun, 15 Dec 2024 18:25:23 +0000 Subject: [PATCH 12/20] Docs: improve Broker coverage. --- v3/docs/DRIVERS.md | 38 ++++++++++++++++++++------------------ v3/docs/TUTORIAL.md | 26 ++++++++++++++++++++++++-- 2 files changed, 44 insertions(+), 20 deletions(-) diff --git a/v3/docs/DRIVERS.md b/v3/docs/DRIVERS.md index 60d78d7..4511e17 100644 --- a/v3/docs/DRIVERS.md +++ b/v3/docs/DRIVERS.md @@ -1136,8 +1136,6 @@ finally: # 9. Message Broker -This is under development: please check for updates. - ```python from primitives import Broker # broker.py ``` @@ -1148,7 +1146,7 @@ message. This enables one to one, one to many, many to one or many to many messaging. A task subscribes to a topic with an `agent`. This is stored by the broker. When -the broker publishes a message, the `agent` of each task subscribed to its topic +the broker publishes a message, every `agent` subscribed to the message topic will be triggered. In the simplest case the `agent` is a `Queue` instance: the broker puts the topic and message onto the subscriber's queue for retrieval. @@ -1173,18 +1171,20 @@ The `topic` arg is typically a string but may be any hashable object. A #### Agent types -An `agent` may be an instance of any of the following: +An `agent` may be an instance of any of the following types. Args refers to any +arguments passed to the `agent`'s' subscription. -* `RingbufQueue` Received messages are queued as a 2-tuple `(topic, message)`. +* `RingbufQueue` Received messages are queued as a 2-tuple `(topic, message)` +assuming no args. * `Queue` Received messages are queued as a 2-tuple `(topic, message)`. -* `function` Called when a message is received. Args: topic, message plus any -further args. -* `bound method` Called when a message is received. Args: topic, message plus any +* `function` Called when a message is received. Args: `topic`, `message` plus any further args. -* `coroutine` Converted to a `task` when a message is received. Args: topic, -message plus any further args. -* `bound coroutine` Converted to a `task` when a message is received. Args: topic, -message plus any further args. +* `bound method` Called when a message is received. Args: `topic`, `message` +plus any further args. +* `coroutine` Converted to a `task` when a message is received. Args: `topic`, +`message` plus any further args. +* `bound coroutine` Converted to a `task` when a message is received. Args: `topic`, +`message` plus any further args. * `user_agent` Instance of a user class. See user agents below. * `Event` Set when a message is received. @@ -1232,7 +1232,8 @@ async def messages(client): broker.publish(topic.decode(), msg.decode()) ``` Assuming the MQTT client is subscribed to multiple topics, message strings are -directed to individual tasks each supporting one topic. +directed to agents, each dedicated to handling a topic. An `agent` might operate +an interface or queue the message for a running task. The following illustrates a use case for passing args to an `agent` (pin nos. are for Pyoard 1.1). @@ -1322,14 +1323,15 @@ applications this behaviour is preferable. In general `RingbufQueue` is preferred as it is optimised for microcontroller use and supports retrieval by an asynchronous iterator. -If either queue type is subscribed with args, publications will queued as a -3-tuple `(topic, message, (args...))`. There is no obvious use case for this. +If either queue type is subscribed with args, a publication will create a queue +entry that is a 3-tuple `(topic, message, (args...))`. There is no obvious use +case for this. #### exceptions -An instance of an `agent` objects is owned by a subscribing tasks but is -executed by a publishing task. If a function used as an `agent` throws an -exception, the traceback will point to a `Broker.publish` call. +An `agent` instance is owned by a subscribing tasks but is executed by a +publishing task. If a function used as an `agent` throws an exception, the +traceback will point to a `Broker.publish` call. The `Broker` class throws a `ValueError` if `.subscribe` is called with an invalid `agent` type. There are a number of non-fatal conditions which can occur diff --git a/v3/docs/TUTORIAL.md b/v3/docs/TUTORIAL.md index e516419..cd24841 100644 --- a/v3/docs/TUTORIAL.md +++ b/v3/docs/TUTORIAL.md @@ -45,7 +45,8 @@ import uasyncio as asyncio 3.7 [Barrier](./TUTORIAL.md#37-barrier) 3.8 [Delay_ms](./TUTORIAL.md#38-delay_ms-class) Software retriggerable delay. 3.9 [Message](./TUTORIAL.md#39-message) - 3.10 [Synchronising to hardware](./TUTORIAL.md#310-synchronising-to-hardware) + 3.10 [Message broker](./TUTORIAL.md#310-message-broker) A publish-subscribe model of messaging and control. + 3.11 [Synchronising to hardware](./TUTORIAL.md#311-synchronising-to-hardware) Debouncing switches, pushbuttons, ESP32 touchpads and encoder knobs. Taming ADC's. 4. [Designing classes for asyncio](./TUTORIAL.md#4-designing-classes-for-asyncio) 4.1 [Awaitable classes](./TUTORIAL.md#41-awaitable-classes) @@ -589,6 +590,8 @@ following classes which are non-standard, are also in that directory: in a similar (but not identical) way to `gather`. * `Delay_ms` A useful software-retriggerable monostable, akin to a watchdog. Calls a user callback if not cancelled or regularly retriggered. + * `RingbufQueue` a MicroPython-optimised queue. + * `Broker` a means of messaging and control based on a publish/subscribe model. A further set of primitives for synchronising hardware are detailed in [section 3.9](./TUTORIAL.md#39-synchronising-to-hardware). @@ -1280,7 +1283,26 @@ provide an object similar to `Event` with the following differences: It may be found in the `threadsafe` directory and is documented [here](./THREADING.md#32-message). -## 3.10 Synchronising to hardware +## 3.10 Message broker + +A `Broker` is a means of communicating data and/or control within or between +modules. It is typically a single global object, and uses a publish-subscribe +model. A publication comprises a `topic` and a `message`; the latter may be any +Python object. Tasks subscribe to a `topic` via an `agent` object. Whenever a +publication, occurs all `agent` instances currently subscribed to that topic are +triggered. + +An `agent` may be an instance of various types including a function, a coroutine +or a queue. + +A benefit of this approach is that the design of publishing tasks can proceed +independently from that of the subscribers; `agent` instances can be subscribed +and unsubscribed at run time with no effect on the publisher. The publisher +neither knows or cares about the type or number of subscribing `agent`s. + +This is [documented here](https://github.com/peterhinch/micropython-async/blob/master/v3/docs/DRIVERS.md#9-message-broker). + +## 3.11 Synchronising to hardware The following hardware-related classes are documented [here](./DRIVERS.md): * `ESwitch` A debounced switch with an `Event` interface. From 3838e1f74d6c71de6625d2ee2defe6da97b9396c Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Fri, 20 Dec 2024 10:35:30 +0000 Subject: [PATCH 13/20] htu21d: I2C address is constructor arg (iss #130) --- v3/as_drivers/htu21d/htu21d_mc.py | 12 ++++++------ v3/docs/HTU21D.md | 7 ++++--- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/v3/as_drivers/htu21d/htu21d_mc.py b/v3/as_drivers/htu21d/htu21d_mc.py index 5d19ba0..385ec48 100644 --- a/v3/as_drivers/htu21d/htu21d_mc.py +++ b/v3/as_drivers/htu21d/htu21d_mc.py @@ -11,7 +11,6 @@ import asyncio from micropython import const -_ADDRESS = const(0x40) # HTU21D Address _PAUSE_MS = const(60) # HTU21D acquisition delay _READ_USER_REG = const(0xE7) @@ -25,10 +24,11 @@ class HTU21D: START_TEMP_MEASURE = b"\xF3" # Commands START_HUMD_MEASURE = b"\xF5" - def __init__(self, i2c, read_delay=10): + def __init__(self, i2c, read_delay=10, address=0x40): self.i2c = i2c - if _ADDRESS not in self.i2c.scan(): + if address not in self.i2c.scan(): raise OSError("No HTU21D device found.") + self.address = address self.temperature = None self.humidity = None asyncio.create_task(self._run(read_delay)) @@ -46,9 +46,9 @@ def __iter__(self): # Await 1st reading yield from asyncio.sleep(0) async def _get_data(self, cmd, divisor=0x131 << 15, bit=1 << 23): - self.i2c.writeto(_ADDRESS, cmd) # Start reading + self.i2c.writeto(self.address, cmd) # Start reading await asyncio.sleep_ms(_PAUSE_MS) # Wait for device - value = self.i2c.readfrom(_ADDRESS, 3) # Read result, check CRC8 + value = self.i2c.readfrom(self.address, 3) # Read result, check CRC8 data, crc = ustruct.unpack(">HB", value) remainder = (data << 8) | crc while bit > 128: @@ -61,4 +61,4 @@ async def _get_data(self, cmd, divisor=0x131 << 15, bit=1 << 23): return data & 0xFFFC # Clear the status bits def user_register(self): # Read the user register byte (should be 2) - return self.i2c.readfrom_mem(_ADDRESS, _READ_USER_REG, 1)[0] + return self.i2c.readfrom_mem(self.address, _READ_USER_REG, 1)[0] diff --git a/v3/docs/HTU21D.md b/v3/docs/HTU21D.md index daa29f3..07feb14 100644 --- a/v3/docs/HTU21D.md +++ b/v3/docs/HTU21D.md @@ -52,9 +52,10 @@ import as_drivers.htu21d.htu_test This provides a single class `HTU21D`. Constructor. -This takes two args, `i2c` (mandatory) and an optional `read_delay=10`. The -former must be an initialised I2C bus instance. The `read_delay` (secs) -determines how frequently the data values are updated. +This takes the following args +* `i2c` (mandatory) An initialised I2C bus instance. +* `read_delay=10`. The frequency (secs) at which data values are updated. +* `address=0x40` I2C address of the chip. Public bound values 1. `temperature` Latest value in Celcius. From 88a0446c393098abafb324da6d16d69e01ce3b73 Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Sun, 29 Dec 2024 12:38:57 +0000 Subject: [PATCH 14/20] DRIVERS.md: Improvements to Broker docs. --- v3/docs/DRIVERS.md | 70 ++++++++++++++++++++++++++-------- v3/primitives/__init__.py | 4 +- v3/primitives/ringbuf_queue.py | 4 +- 3 files changed, 58 insertions(+), 20 deletions(-) diff --git a/v3/docs/DRIVERS.md b/v3/docs/DRIVERS.md index 4511e17..570ce47 100644 --- a/v3/docs/DRIVERS.md +++ b/v3/docs/DRIVERS.md @@ -1141,11 +1141,10 @@ from primitives import Broker # broker.py ``` The `Broker` class provides a flexible means of messaging between running tasks. It uses a publish-subscribe model (akin to MQTT) whereby the transmitting task -publishes to a topic. Any tasks subscribed to that topic will receive the -message. This enables one to one, one to many, many to one or many to many -messaging. +publishes to a topic. Objects subscribed to that topic will receive the message. +This enables one to one, one to many, many to one or many to many messaging. -A task subscribes to a topic with an `agent`. This is stored by the broker. When +A task subscribes to a topic via an `agent`. This is stored by the broker. When the broker publishes a message, every `agent` subscribed to the message topic will be triggered. In the simplest case the `agent` is a `Queue` instance: the broker puts the topic and message onto the subscriber's queue for retrieval. @@ -1153,10 +1152,15 @@ broker puts the topic and message onto the subscriber's queue for retrieval. More advanced agents can perform actions in response to a message, such as calling a function, launching a `task` or lighting an LED. +Agents may be subscribed and unsubscribed dynamically. The publishing task has +no "knowledge" of the number or type of agents subscribed to a topic. The module +is not threadsafe: `Broker` methods should not be called from a hard ISR or from +another thread. + #### Broker methods -All are synchronous. They are not threadsafe so should not be called from a hard -ISR or from another thread. The constructor has no args. +All are synchronous. +* Constructor This has no args. * `subscribe(topic, agent, *args)` Passed `agent` will be triggered by messages with a matching `topic`. Any additional args will be passed to the `agent` when it is triggered. @@ -1172,21 +1176,21 @@ The `topic` arg is typically a string but may be any hashable object. A #### Agent types An `agent` may be an instance of any of the following types. Args refers to any -arguments passed to the `agent`'s' subscription. +arguments passed to the `agent` on subscription. * `RingbufQueue` Received messages are queued as a 2-tuple `(topic, message)` -assuming no args. -* `Queue` Received messages are queued as a 2-tuple `(topic, message)`. +assuming no subscription args - otheriwse `(topic, message, (args...))`. +* `Queue` Received messages are queued as described above. * `function` Called when a message is received. Args: `topic`, `message` plus any -further args. +further subscription args. * `bound method` Called when a message is received. Args: `topic`, `message` plus any further args. * `coroutine` Converted to a `task` when a message is received. Args: `topic`, -`message` plus any further args. +`message` plus any further subscription args. * `bound coroutine` Converted to a `task` when a message is received. Args: `topic`, -`message` plus any further args. -* `user_agent` Instance of a user class. See user agents below. +`message` plus any further subscription args. * `Event` Set when a message is received. +* `user_agent` Instance of a user class. See user agents below. Note that synchronous `agent` instances must run to completion quickly otherwise the `publish` method will be slowed. See [Notes](./DRIVERS.md#93-notes) for @@ -1202,18 +1206,18 @@ import asyncio from primitives import Broker, RingbufQueue broker = Broker() -queue = RingbufQueue(20) async def sender(t): for x in range(t): await asyncio.sleep(1) broker.publish("foo_topic", f"test {x}") async def receiver(): + queue = RingbufQueue(20) + broker.subscribe("foo_topic", queue) async for topic, message in queue: print(topic, message) async def main(): - broker.subscribe("foo_topic", queue) rx = asyncio.create_task(receiver()) await sender(10) await asyncio.sleep(2) @@ -1266,6 +1270,40 @@ async def main(): asyncio.run(main()) ``` +A task can wait on multiple topics using a `RingbufQueue`: +```python +import asyncio +from primitives import Broker, RingbufQueue + +broker = Broker() + +async def receiver(): + q = RingbufQueue(10) + broker.subscribe("foo_topic", q) + broker.subscribe("bar_topic", q) + async for topic, message in q: + print(f"Received Topic: {topic} Message: {message}") + + +async def sender(t): + for x in range(t): + await asyncio.sleep(1) + broker.publish("foo_topic", f"test {x}") + broker.publish("bar_topic", f"test {x}") + broker.publish("ignore me", f"test {x}") + + +async def main(): + rx = asyncio.create_task(receiver()) + await sender(10) + await asyncio.sleep(2) + rx.cancel() + + +asyncio.run(main()) +``` +here the `receiver` task waits on two topics. The asynchronous iterator returns +messages as they are published. ## 9.2 User agents @@ -1298,7 +1336,7 @@ asyncio.run(main()) #### The publish/subscribe model -As in the real world publication carries no guarantee of reception. If at the +As in the real world, publication carries no guarantee of readership. If at the time of publication there are no tasks with subscribed `agent` instances, the message will silently be lost. diff --git a/v3/primitives/__init__.py b/v3/primitives/__init__.py index a6914f9..2dabb4d 100644 --- a/v3/primitives/__init__.py +++ b/v3/primitives/__init__.py @@ -1,6 +1,6 @@ # __init__.py Common functions for uasyncio primitives -# Copyright (c) 2018-2022 Peter Hinch +# Copyright (c) 2018-2024 Peter Hinch # Released under the MIT License (MIT) - see LICENSE file import asyncio @@ -57,7 +57,7 @@ def _handle_exception(loop, context): "Agent": "broker", } -# Copied from uasyncio.__init__.py +# Copied from asyncio.__init__.py # Lazy loader, effectively does: # global attr # from .mod import attr diff --git a/v3/primitives/ringbuf_queue.py b/v3/primitives/ringbuf_queue.py index d2b6f90..eaf7ad3 100644 --- a/v3/primitives/ringbuf_queue.py +++ b/v3/primitives/ringbuf_queue.py @@ -7,8 +7,8 @@ # Uses pre-allocated ring buffer: can use list or array # Asynchronous iterator allowing consumer to use async for # put_nowait QueueFull exception can be ignored allowing oldest data to be discarded - -# this is not thread safe, however the class as a whole is not TS because of its -# use of Event objects. +# this is not thread safe. Nor is the class as a whole TS because of its use of +# Event objects. import asyncio From 6c59025349fa521ebecd9abf8eac93e2cb105225 Mon Sep 17 00:00:00 2001 From: Jeff Otterson Date: Thu, 2 Jan 2025 16:42:16 -0500 Subject: [PATCH 15/20] set dirty to false sooner to mitigate race condition that can happen during rapid updated --- v3/as_drivers/hd44780/alcd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/v3/as_drivers/hd44780/alcd.py b/v3/as_drivers/hd44780/alcd.py index 2b1c76c..908a322 100644 --- a/v3/as_drivers/hd44780/alcd.py +++ b/v3/as_drivers/hd44780/alcd.py @@ -100,9 +100,9 @@ async def runlcd(self): # Periodically check for changed text and update LCD if for row in range(self.rows): if self.dirty[row]: msg = self[row] + self.dirty[row] = False self.lcd_byte(LCD.LCD_LINES[row], LCD.CMD) for thisbyte in msg: self.lcd_byte(ord(thisbyte), LCD.CHR) await asyncio.sleep_ms(0) # Reshedule ASAP - self.dirty[row] = False await asyncio.sleep_ms(20) # Give other coros a look-in From cd56ee035804295d742d36f0adf7101a37e00be7 Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Fri, 10 Jan 2025 10:19:02 +0000 Subject: [PATCH 16/20] Tutorial: add link to event based programming doc. --- v3/docs/TUTORIAL.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/v3/docs/TUTORIAL.md b/v3/docs/TUTORIAL.md index cd24841..b818d58 100644 --- a/v3/docs/TUTORIAL.md +++ b/v3/docs/TUTORIAL.md @@ -797,7 +797,9 @@ wa = WaitAll((evt1, evt2)).wait() await wa # Both were triggered ``` -Awaiting `WaitAll` or `WaitAny` may be cancelled or subject to a timeout. +Awaiting `WaitAll` or `WaitAny` may be cancelled or subject to a timeout. These +primitives are documented in +[event baed programming](https://github.com/peterhinch/micropython-async/blob/master/v3/docs/EVENTS.md). ###### [Contents](./TUTORIAL.md#contents) From 197c2b5d72cc7633e4b3176eabdeef532ea09ffd Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Sat, 11 Jan 2025 18:08:12 +0000 Subject: [PATCH 17/20] Broker: add wildcard subscriptions. --- v3/docs/DRIVERS.md | 22 ++++++++++++++++++---- v3/primitives/__init__.py | 1 + v3/primitives/broker.py | 17 ++++++++++++++++- v3/primitives/tests/broker_test.py | 30 ++++++++++++++++++------------ 4 files changed, 53 insertions(+), 17 deletions(-) diff --git a/v3/docs/DRIVERS.md b/v3/docs/DRIVERS.md index 570ce47..d1570b8 100644 --- a/v3/docs/DRIVERS.md +++ b/v3/docs/DRIVERS.md @@ -26,11 +26,11 @@ MicroPython's `asyncio` when used in a microcontroller context. 6.1 [Encoder class](./DRIVERS.md#61-encoder-class) 7. [Ringbuf Queue](./DRIVERS.md#7-ringbuf-queue) A MicroPython optimised queue primitive. 8. [Delay_ms class](./DRIVERS.md#8-delay_ms-class) A flexible retriggerable delay with callback or Event interface. - 9. [Message Broker](./DRIVERS.md#9-message-broker) A flexible means of messaging between - tasks. + 9. [Message Broker](./DRIVERS.md#9-message-broker) A flexible means of messaging between tasks. 9.1 [Further examples](./DRIVERS.md#91-further-examples) 9.2 [User agents](./DRIVERS.md#92-user-agents) User defined Agent classes. - 9.3 [Notes](./DRIVERS.md#93-notes) + 9.3 [Wildcard subscriptions](./DRIVERS.md#93-wildcard-subscriptions) + 9.4 [Notes](./DRIVERS.md#9-notes) 10. [Additional functions](./DRIVERS.md#10-additional-functions) 10.1 [launch](./DRIVERS.md#101-launch) Run a coro or callback interchangeably. 10.2 [set_global_exception](./DRIVERS.md#102-set_global_exception) Simplify debugging with a global exception handler. @@ -1332,7 +1332,21 @@ async def main(): asyncio.run(main()) ``` -## 9.3 Notes +## 9.3 Wildcard subscriptions + +In the case of publications whose topics are strings, a single call to +`.subscribe` can subscribe an `agent` to multiple topics. This is by wildcard +matching. By default exact matching is used, however this can be changed to use +regular expressions as in this code fragment: +```py +from primitives import Broker, RegExp +broker.subscribe(RegExp(".*_topic"), some_agent) +``` +In this case `some_agent` would be triggered by publications to `foo_topic` or +`bar_topic` because the string `".*_topic"` matches these by the rules of +regular expressions. + +## 9.4 Notes #### The publish/subscribe model diff --git a/v3/primitives/__init__.py b/v3/primitives/__init__.py index 2dabb4d..fe15c4f 100644 --- a/v3/primitives/__init__.py +++ b/v3/primitives/__init__.py @@ -55,6 +55,7 @@ def _handle_exception(loop, context): "SwArray": "sw_array", "Broker": "broker", "Agent": "broker", + "RegExp": "broker", } # Copied from asyncio.__init__.py diff --git a/v3/primitives/broker.py b/v3/primitives/broker.py index c220c85..4ee9b37 100644 --- a/v3/primitives/broker.py +++ b/v3/primitives/broker.py @@ -8,12 +8,21 @@ import asyncio from primitives import Queue, RingbufQueue, type_coro +import re class Agent: pass +class RegExp: + def __init__(self, re_str): + self.re = re.compile(re_str) + + def matching(self, topic): + return re.match(self.re, topic) is not None + + def _validate(a): return ( isinstance(a, asyncio.Event) @@ -50,7 +59,13 @@ def unsubscribe(self, topic, agent, *args): print(f"Unsubscribe topic {topic} fail: topic not subscribed.") def publish(self, topic, message): - agents = self.get(topic, []) + agents = set() # Agents which are triggered by this topic + if isinstance(topic, str): # Check regexps + # Are any keys RegExp instances? + for regexp in [k for k in self.keys() if isinstance(k, RegExp)]: + if regexp.matching(topic): + agents.update(self[regexp]) # Append matching agents + agents.update(self.get(topic, [])) # Exact match for agent, args in agents: if isinstance(agent, asyncio.Event): agent.set() diff --git a/v3/primitives/tests/broker_test.py b/v3/primitives/tests/broker_test.py index a2c09a3..50fc87c 100644 --- a/v3/primitives/tests/broker_test.py +++ b/v3/primitives/tests/broker_test.py @@ -3,7 +3,7 @@ # import primitives.tests.broker_test import asyncio -from primitives import Broker, Queue, RingbufQueue +from primitives import Broker, Queue, RingbufQueue, RegExp broker = Broker() @@ -49,12 +49,13 @@ def get_data(self, topic, message): async def print_queue(q): while True: - topic, message = await q.get() + topic, message = await asyncio.wait_for(q.get(), 2) print(topic, message) async def print_ringbuf_q(q): - async for topic, message, args in q: + while True: + topic, message, args = await asyncio.wait_for(q.get(), 2) print(topic, message, args) @@ -98,20 +99,19 @@ async def main(): print("Unsubscribing method") broker.unsubscribe("foo_topic", tc.get_data) # Async method print("Retrieving foo_topic messages from Queue") - try: - await asyncio.wait_for(print_queue(q), 5) - except asyncio.TimeoutError: - print("Timeout") print("Retrieving bar_topic messages from RingbufQueue") - try: - await asyncio.wait_for(print_ringbuf_q(rq), 5) - except asyncio.TimeoutError: - print("Timeout") + await asyncio.gather(print_queue(q), print_ringbuf_q(rq), return_exceptions=True) + # Queues are now empty print() + print("*** Unsubscribing queues ***") + broker.unsubscribe("foo_topic", q) + broker.unsubscribe("bar_topic", rq, "args", "added") + print() + print("*** Testing error reports and exception ***") print() Broker.Verbose = True - print("*** Check error on invalid unsubscribe ***") + print("*** Produce warning messages on invalid unsubscribe ***") broker.unsubscribe("rats", "more rats") # Invalid topic broker.unsubscribe("foo_topic", "rats") # Invalid agent print("*** Check exception on invalid subscribe ***") @@ -120,6 +120,12 @@ async def main(): print("Test FAIL") except ValueError: print("Test PASS") + print() + print("*** Test wildcard subscribe ***") + broker.subscribe(RegExp(".*_topic"), func) + broker.publish("FAIL", func) # No match + asyncio.create_task(test(5)) + await asyncio.sleep(10) asyncio.run(main()) From 8e01d4287b716f1513a33397074ae6e085ecbe4c Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Sun, 30 Mar 2025 12:28:11 +0100 Subject: [PATCH 18/20] DRIVERS.md: Add note re Delay_ms.deinit(). --- v3/docs/DRIVERS.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/v3/docs/DRIVERS.md b/v3/docs/DRIVERS.md index d1570b8..aebb691 100644 --- a/v3/docs/DRIVERS.md +++ b/v3/docs/DRIVERS.md @@ -1070,7 +1070,9 @@ Synchronous methods: the `Task` instance. This allows the `Task` to be cancelled or awaited. 6. `callback` args `func=None`, `args=()`. Allows the callable and its args to be assigned, reassigned or disabled at run time. - 7. `deinit` No args. Cancels the running task. See [Object scope](./TUTORIAL.md#44-object-scope). + 7. `deinit` No args. Cancels the running task. To avoid a memory leak this + should be called before allowing a `Delay_ms` object to go out of scope. See + [Object scope](./TUTORIAL.md#44-object-scope). 8. `clear` No args. Clears the `Event` described in `wait` below. 9. `set` No args. Sets the `Event` described in `wait` below. From 4001e28f402b9567d9e01318ba2e30d28151d6d1 Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Thu, 8 May 2025 18:35:17 +0100 Subject: [PATCH 19/20] broker.py: Provide instance. Pub message defaults to None. --- v3/docs/DRIVERS.md | 40 ++++++++++++++++-------------- v3/primitives/__init__.py | 1 + v3/primitives/broker.py | 9 ++++--- v3/primitives/tests/broker_test.py | 4 +-- 4 files changed, 30 insertions(+), 24 deletions(-) diff --git a/v3/docs/DRIVERS.md b/v3/docs/DRIVERS.md index aebb691..dcaf218 100644 --- a/v3/docs/DRIVERS.md +++ b/v3/docs/DRIVERS.md @@ -1139,14 +1139,15 @@ finally: # 9. Message Broker ```python -from primitives import Broker # broker.py +from primitives import Broker, broker # broker.py ``` The `Broker` class provides a flexible means of messaging between running tasks. It uses a publish-subscribe model (akin to MQTT) whereby the transmitting task publishes to a topic. Objects subscribed to that topic will receive the message. This enables one to one, one to many, many to one or many to many messaging. -A task subscribes to a topic via an `agent`. This is stored by the broker. When +A task subscribes to a topic via an `agent`: this term describes a set of Python +types which may be used in this role. An `agent` is stored by the broker. When the broker publishes a message, every `agent` subscribed to the message topic will be triggered. In the simplest case the `agent` is a `Queue` instance: the broker puts the topic and message onto the subscriber's queue for retrieval. @@ -1159,6 +1160,12 @@ no "knowledge" of the number or type of agents subscribed to a topic. The module is not threadsafe: `Broker` methods should not be called from a hard ISR or from another thread. +A `Broker` instance `broker` is provided. Where multiple modules issue +```python +from primitives import broker +``` +all will see the same instance, facilitating message passing between modules. + #### Broker methods All are synchronous. @@ -1168,12 +1175,17 @@ with a matching `topic`. Any additional args will be passed to the `agent` when it is triggered. * `unsubscribe(topic, agent, *args)` The `agent` will stop being triggered. If args were passed on subscription, the same args must be passed. -* `publish(topic, message)` All `agent` instances subscribed to `topic` will be -triggered, receiving `topic` and `message` plus any further args that were -passed to `subscribe`. +* `publish(topic, message=None)` All `agent` instances subscribed to `topic` +will be triggered, receiving `topic` and `message` plus any further args that +were passed to `subscribe`. The `topic` arg is typically a string but may be any hashable object. A -`message` is an arbitrary Python object. +`message` is an arbitrary Python object. Where string topics are used, wildcard +subscriptions are possible. + +#### Broker class variable + +* `Verbose=True` Enables printing of debug messages. #### Agent types @@ -1198,16 +1210,11 @@ Note that synchronous `agent` instances must run to completion quickly otherwise the `publish` method will be slowed. See [Notes](./DRIVERS.md#93-notes) for further details on queue behaviour. -#### Broker class variable - -* `Verbose=True` Enables printing of debug messages. - #### example ```py import asyncio -from primitives import Broker, RingbufQueue +from primitives import broker, RingbufQueue -broker = Broker() async def sender(t): for x in range(t): await asyncio.sleep(1) @@ -1245,11 +1252,10 @@ The following illustrates a use case for passing args to an `agent` (pin nos. are for Pyoard 1.1). ```py import asyncio -from primitives import Broker +from primitives import broker from machine import Pin red = Pin("A13", Pin.OUT, value=0) # Pin nos. for Pyboard V1.1 green = Pin("A14", Pin.OUT, value=0) -broker = Broker() async def flash(): broker.publish("led", 1) @@ -1275,9 +1281,8 @@ asyncio.run(main()) A task can wait on multiple topics using a `RingbufQueue`: ```python import asyncio -from primitives import Broker, RingbufQueue +from primitives import broker, RingbufQueue -broker = Broker() async def receiver(): q = RingbufQueue(10) @@ -1316,9 +1321,8 @@ should run to completion quickly. ```py import asyncio -from primitives import Broker, Agent +from primitives import broker, Agent -broker = Broker() class MyAgent(Agent): def put(sef, topic, message, arg): print(f"User agent. Topic: {topic} Message: {message} Arg: {arg}") diff --git a/v3/primitives/__init__.py b/v3/primitives/__init__.py index fe15c4f..ceaad77 100644 --- a/v3/primitives/__init__.py +++ b/v3/primitives/__init__.py @@ -54,6 +54,7 @@ def _handle_exception(loop, context): "Keyboard": "sw_array", "SwArray": "sw_array", "Broker": "broker", + "broker": "broker", "Agent": "broker", "RegExp": "broker", } diff --git a/v3/primitives/broker.py b/v3/primitives/broker.py index 4ee9b37..73072bb 100644 --- a/v3/primitives/broker.py +++ b/v3/primitives/broker.py @@ -1,6 +1,6 @@ # broker.py A message broker for MicroPython -# Copyright (c) 2024 Peter Hinch +# Copyright (c) 2024-2025 Peter Hinch # Released under the MIT License (MIT) - see LICENSE file # Inspired by the following @@ -11,7 +11,7 @@ import re -class Agent: +class Agent: # ABC for user agent pass @@ -58,7 +58,7 @@ def unsubscribe(self, topic, agent, *args): elif Broker.Verbose: print(f"Unsubscribe topic {topic} fail: topic not subscribed.") - def publish(self, topic, message): + def publish(self, topic, message=None): agents = set() # Agents which are triggered by this topic if isinstance(topic, str): # Check regexps # Are any keys RegExp instances? @@ -84,3 +84,6 @@ def publish(self, topic, message): res = agent(topic, message, *args) if isinstance(res, type_coro): asyncio.create_task(res) + + +broker = Broker() diff --git a/v3/primitives/tests/broker_test.py b/v3/primitives/tests/broker_test.py index 50fc87c..ad1357e 100644 --- a/v3/primitives/tests/broker_test.py +++ b/v3/primitives/tests/broker_test.py @@ -3,9 +3,7 @@ # import primitives.tests.broker_test import asyncio -from primitives import Broker, Queue, RingbufQueue, RegExp - -broker = Broker() +from primitives import broker, Queue, RingbufQueue, RegExp # Periodically publish messages to two topics async def test(t): From 84c70cbf49ba3b69ffb59fc8f3cb5e0c9f41d4b0 Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Thu, 15 May 2025 10:50:04 +0100 Subject: [PATCH 20/20] INTERRUPTS.md: fix garbled text. --- v3/docs/INTERRUPTS.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/v3/docs/INTERRUPTS.md b/v3/docs/INTERRUPTS.md index 138a2b0..a2c684c 100644 --- a/v3/docs/INTERRUPTS.md +++ b/v3/docs/INTERRUPTS.md @@ -8,7 +8,7 @@ interrupts in `asyncio` applications. Writing an interrupt service routine (ISR) requires care: see the [official docs](https://docs.micropython.org/en/latest/reference/isr_rules.html). There are restrictions (detailed below) on the way an ISR can interface with -`asyncio`. Finally, on many platformasyncioupts are a limited resource. In +`asyncio`. Finally, on many platforms interrupts are a limited resource. In short interrupts are extremely useful but, if a practical alternative exists, it should be seriously considered.